본문으로 건너뛰기
AIPida

LangGraph 실전: PostgresSaver로 중단·재개되는 HITL 승인 워크플로 만들기

interrupt()로 멈추고, Command(resume=)로 이어가고, PostgreSQL에 상태를 영속화하는 패턴

중급AI 에이전트·

1. 핵심 직답: 어떻게 만드나

사람의 승인을 기다리는 에이전트는 두 가지 LangGraph 기능을 조합하면 된다. 첫째, 그래프에 체크포인터(checkpointer) 를 붙여 실행 상태를 외부 저장소에 저장한다. 둘째, 승인이 필요한 노드 안에서 interrupt() 를 호출해 실행을 멈추고, 나중에 Command(resume=...) 로 다시 깨운다.

프로덕션에서 프로세스가 재시작돼도 살아남으려면 인메모리가 아니라 PostgresSaver(또는 비동기 AsyncPostgresSaver) 를 써서 상태를 PostgreSQL에 영속화한다. 세 줄로 요약하면:

  • interrupt(payload) → 그래프가 멈추고, 출력에 __interrupt__ 키로 사람에게 보여줄 값이 실린다.
  • 사람이 결정하면 같은 thread_idCommand(resume=결정값) 을 흘려보낸다 → interrupt() 의 반환값이 그 결정값이 되어 노드가 이어진다.
  • 상태는 thread_id 단위로 PostgreSQL에 저장되므로, 승인까지 몇 시간·며칠이 걸려도, 서버가 재배포돼도 정확히 그 지점에서 재개된다.

아래에서 동작 원리, 실제 코드 2종(동기 PostgresSaver, 비동기 + FastAPI 승인 게이트), 트레이드오프, 그리고 가장 자주 데이는 함정(노드는 재개 시 처음부터 다시 실행된다) 까지 다룬다.

2. 문제 정의: 왜 그냥 멈추면 안 되나

에이전트가 환불 처리, 외부 메일 발송, DB 대량 수정 같은 되돌리기 어려운 행동(side effect) 을 하기 직전에 사람의 승인을 받아야 하는 상황은 흔하다. 순진하게 구현하면 두 가지가 깨진다.

  1. 상태 휘발. 승인 요청을 보낸 뒤 사람이 답하기까지 몇 시간이 걸릴 수 있다. 그 사이 워커 프로세스가 재배포되거나 크래시되면, 메모리에만 있던 "어디까지 진행했는지"가 통째로 사라진다.
  2. 재개 지점 분실. 승인이 와도 "이 승인이 어느 실행의, 어느 노드의 무슨 요청에 대한 답인지"를 매핑할 방법이 없으면 처음부터 다시 돌려야 한다. LLM 호출 비용과 지연이 그대로 중복된다.

LangGraph는 이 둘을 체크포인터 + thread_id + interrupt 로 정면 해결한다. 체크포인터는 각 슈퍼스텝(superstep)이 끝날 때마다 그래프 상태의 스냅샷(checkpoint)을 저장하고, thread_id 는 "하나의 실행 흐름"을 식별한다. interrupt() 는 그 스냅샷 위에서 깔끔하게 멈출 수 있게 해준다. 즉, 멈춤(pause)이 곧 영속화된 상태이므로 무한정 기다려도 안전하다.

전제 조건: interrupt() 는 체크포인터가 있어야만 동작한다. 상태를 어딘가 저장해야 멈췄다가 이어갈 수 있기 때문이다. 체크포인터 없이 interrupt() 를 호출하면 재개 자체가 불가능하다.

3. 메커니즘: 체크포인터·interrupt·Command가 맞물리는 방식

3.1 체크포인터의 read-execute-write 사이클

체크포인터는 BaseCheckpointSaver 인터페이스를 구현한 어댑터다. 핵심 메서드는 put(체크포인트 저장), get_tuple(조회), list(목록), put_writes(보류 중 쓰기 저장), delete_thread(스레드 삭제)이며, 비동기 버전(aput, aget_tuple, alist 등)도 있다. 그래프를 컴파일할 때 builder.compile(checkpointer=...) 로 주입한다.

실행 시 사이클은 단순하다. 주어진 thread_id 의 최신 체크포인트를 읽고 → 다음 스텝을 실행하고 → 새 상태를 새 체크포인트로 쓴다. 그래서 같은 thread_id 로 다시 호출하면 "중단된 곳"에서 이어진다.

3.2 interrupt()가 멈추는 방식

노드 안에서 interrupt(value) 를 처음 호출하면 내부적으로 GraphInterrupt 예외가 발생해 그래프가 멈추고, value 가 호출자에게 전달된다. invoke() 로 실행했다면 결과 dict에 __interrupt__ 키로, stream() 이라면 스트림 이벤트로 노출된다. 각 Interrupt 객체에는 사람에게 보여줄 value 와 고유 id 가 들어 있다.

3.3 Command(resume=)로 깨우는 방식

재개는 오직 Command 프리미티브로만 한다. 같은 thread_id 의 config로 graph.invoke(Command(resume=결정값), config) 를 호출하면, Command(resume=...) 에 넣은 값이 곧 멈춰 있던 interrupt() 호출의 반환값이 된다. 노드는 그 반환값을 받아 나머지 로직을 실행한다.

구성요소import역할
PostgresSaver / AsyncPostgresSaverlanggraph.checkpoint.postgres / .aio상태를 PostgreSQL에 영속화
interruptlanggraph.types노드 실행을 멈추고 값을 노출
Commandlanggraph.typesresume= 로 멈춘 지점 재개
thread_id (config)config={"configurable": {"thread_id": ...}}어떤 실행을 이어갈지 식별

4. 코드 ①: PostgresSaver 영속화 + 승인 게이트 (동기)

환불 에이전트의 최소 골격이다. 환불을 실행하기 전 interrupt() 로 멈추고, 승인이 오면 이어간다.

# pip install langgraph langgraph-checkpoint-postgres
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import interrupt, Command

DB_URI = "postgresql://user:pass@localhost:5432/agent?sslmode=disable"

class RefundState(TypedDict):
    order_id: str
    amount: int
    decision: str

def request_approval(state: RefundState):
    # 멈춤. 이 payload가 __interrupt__로 사람에게 노출된다.
    decision = interrupt({
        "action": "refund",
        "order_id": state["order_id"],
        "amount": state["amount"],
        "prompt": "이 환불을 승인하시겠습니까? (approve / reject)",
    })
    # Command(resume=...)로 넘긴 값이 여기로 돌아온다.
    return {"decision": decision}

def execute_refund(state: RefundState):
    if state["decision"] != "approve":
        return {"decision": "rejected"}
    # 실제 환불 API 호출 — 멱등키(order_id)로 중복 방지 (5절 참고)
    # refund_api.create(idempotency_key=state["order_id"], amount=state["amount"])
    return {"decision": "refunded"}

builder = StateGraph(RefundState)
builder.add_node("request_approval", request_approval)
builder.add_node("execute_refund", execute_refund)
builder.add_edge(START, "request_approval")
builder.add_edge("request_approval", "execute_refund")
builder.add_edge("execute_refund", END)

# 최초 1회: 체크포인트 테이블 생성
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()  # 첫 사용 시 필수
    graph = builder.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "refund-7781"}}

    # 1) 실행 → 승인 노드에서 멈춤
    result = graph.invoke({"order_id": "ord-7781", "amount": 39000}, config)
    print(result["__interrupt__"])  # 사람에게 보여줄 승인 요청

    # ... 여기서 프로세스가 죽어도 상태는 PostgreSQL에 남아 있다 ...

    # 2) 사람이 승인 → 같은 thread_id로 재개
    final = graph.invoke(Command(resume="approve"), config)
    print(final["decision"])  # refunded

.setup() 은 최초 1회만 호출하면 된다(필요한 테이블을 만든다). 운영에서는 배포 마이그레이션 단계에서 한 번 실행하고, 요청 경로에서는 호출하지 않는 편이 깔끔하다.

5. 코드 ②: 비동기 AsyncPostgresSaver + FastAPI 승인 게이트

실제 서비스는 "실행 요청 → 멈춤 → (다른 HTTP 요청으로) 승인 → 재개"가 서로 다른 요청·다른 시점에 일어난다. 비동기 풀과 FastAPI로 분리한 예다.

# pip install langgraph-checkpoint-postgres psycopg[binary,pool]
from psycopg_pool import AsyncConnectionPool
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.types import Command
from fastapi import FastAPI

DB_URI = "postgresql://user:pass@localhost:5432/agent"
app = FastAPI()
pool: AsyncConnectionPool | None = None
graph = None  # builder.compile(checkpointer=...) 로 채움

@app.on_event("startup")
async def startup():
    global pool, graph
    # 수동 연결 시 두 옵션이 필수다 (아래 함정 박스 참고)
    pool = AsyncConnectionPool(
        conninfo=DB_URI,
        max_size=20,
        kwargs={"autocommit": True, "row_factory": "dict_row"},
    )
    checkpointer = AsyncPostgresSaver(pool)
    await checkpointer.setup()       # 최초 1회만 의미 있음
    graph = builder.compile(checkpointer=checkpointer)

@app.post("/refunds/{order_id}/start")
async def start(order_id: str, amount: int):
    config = {"configurable": {"thread_id": f"refund-{order_id}"}}
    result = await graph.ainvoke(
        {"order_id": order_id, "amount": amount},
        config,
        durability="sync",  # 승인 대기 동안 절대 유실 금지 (6절)
    )
    return {"status": "awaiting_approval", "interrupt": result["__interrupt__"]}

@app.post("/refunds/{order_id}/decide")
async def decide(order_id: str, decision: str):  # "approve" | "reject"
    config = {"configurable": {"thread_id": f"refund-{order_id}"}}
    final = await graph.ainvoke(Command(resume=decision), config)
    return {"decision": final["decision"]}

함정: 수동 연결에는 autocommit=Truerow_factory=dict_row 가 필수다. 직접 연결/풀을 만들어 PostgresSaver/AsyncPostgresSaver 에 넘길 때 이 둘이 빠지면 깨진다. autocommit=True 가 없으면 .setup() 이 테이블 생성을 커밋하지 못하고, row_factory=dict_row 가 없으면 내부에서 row["컬럼"] 딕셔너리 접근을 하므로 런타임 에러가 난다. from_conn_string() 컨텍스트 매니저를 쓰면 이 옵션이 자동 적용되지만, 수동 풀은 본인이 챙겨야 한다.

상태 직접 조회·수정

승인 UI에 "현재 무엇을 기다리는지"를 보여주거나, 사람이 값을 교정하게 하려면 그래프의 상태 API를 쓴다.

snapshot = graph.get_state(config)      # 현재 상태 + 다음 실행 노드(next)
print(snapshot.values, snapshot.next)
graph.update_state(config, {"amount": 35000})  # 재개 전에 값 교정

6. 트레이드오프: durability 모드 선택

체크포인터를 붙이면 자동으로 durable execution이 켜진다. 다만 "얼마나 자주, 언제 디스크에 쓸 것인가"는 durability 파라미터로 조절하며, 성능 ↔ 안전성의 트레이드오프다. invoke/ainvoke/stream 호출 시 durability= 로 지정한다.

모드언제 쓰나장점위험
"exit"실행이 끝나거나 에러·인터럽트로 빠져나갈 때만 저장긴 그래프에서 최고 성능중간 상태 미저장 → 스텝 중간 크래시는 복구 불가
"async"다음 스텝을 실행하면서 비동기로 저장성능·내구성 균형실행 도중 크래시 시 마지막 체크포인트 누락 가능성(소)
"sync"다음 스텝 시작 전에 동기로 저장모든 체크포인트 보장, 최고 내구성약간의 성능 오버헤드

HITL 승인 워크플로의 선택. 승인 대기 구간은 "되돌리기 어려운 행동 직전"이므로 유실이 곧 사고다. 승인 요청을 만드는 실행 경로는 durability="sync" 로, 트래픽이 많고 유실 비용이 낮은 보조 그래프는 "async"/"exit" 로 차등 적용하는 것이 합리적이다. 참고로 interrupt 로 멈추는 순간은 어차피 그래프가 "빠져나가는" 시점이라 "exit" 에서도 그 지점은 저장되지만, 스텝 중간 크래시 복구를 원한다면 "sync"/"async" 가 필요하다.

7. 가장 큰 함정: 노드는 재개 시 처음부터 다시 실행된다

이것 하나만 기억해도 절반은 안전하다. Command(resume=...) 로 재개하면, 멈췄던 노드는 중간이 아니라 처음부터 다시 실행된다. interrupt() 호출 지점까지의 모든 코드가 한 번 더 돌고, 그제서야 interrupt() 가 resume 값을 반환한다. 즉 interrupt() 앞에 둔 LLM 호출·API 요청·DB 쓰기는 재개할 때마다 다시 실행된다.

왜 그런가

LangGraph는 노드 단위로 체크포인트를 잡는다. 노드 내부의 임의 지점을 저장하는 게 아니라 "이 노드를 다시 돌려라"를 기록한다. 그래서 replay 시 노드 본문이 통째로 재실행되고, 그 안의 비결정적 연산은 그대로 반복된다.

회피법 3가지

# 나쁜 예: interrupt 앞에 부수효과 → 재개 때 메일 두 번 발송
def bad(state):
    send_email(state["to"])          # 재개 시 또 실행됨!
    ok = interrupt("승인?")
    return {"ok": ok}

# 좋은 예 1: interrupt를 노드 맨 앞에 두고, 부수효과는 그 뒤(다음 노드)로
def gate(state):
    decision = interrupt({"prompt": "승인?"})  # 부수효과 없음
    return {"decision": decision}
  1. interrupt() 를 노드 최상단에 배치. 멈춤 전에 부수효과를 두지 않으면 재실행돼도 무해하다.
  2. 부수효과는 멱등(idempotent)하게. 환불·결제·메일은 order_id 같은 멱등키를 외부 API에 넘기거나, 실행 전 "이미 처리됨?"을 DB로 재확인한다.
  3. 비결정적·부수효과 연산은 @task 로 감싼다. @task 로 표시한 작업은 결과가 체크포인트에 캐시돼 재개 시 재실행되지 않는다. 단, LangGraph Platform/API 서버에 배포할 때는 StateGraph 노드 안에서 @task 를 쓰지 말 것 — 노드 체크포인팅은 런타임 주입 체크포인터를 쓰는데 @task 는 컴파일타임 체크포인터를 찾아 None 이 되는 알려진 함정이 있다. 이 경우 작업을 별도 노드로 분리한다.

한국 실무 메모: 토스페이먼츠·KG이니시스 같은 PG로 결제·환불을 실행할 때 이 재실행 함정은 곧 이중 결제/이중 환불 사고가 된다. 우리 운영에서도 외부 머니패스 호출은 예외 없이 멱등키 + 사전 상태 재확인을 강제하는 가드를 둔다. interrupt() 뒤의 실행 노드에 PG 호출을 두고, 그 노드 진입 시 "이미 환불됨" 플래그를 먼저 검사하는 패턴이 안전하다.

8. 언제 쓰지 말아야 하나 (when NOT to)

체크포인터 + interrupt는 강력하지만 만능은 아니다. 아래는 다른 도구가 나은 경우다.

  • 단순 1턴 요청/응답. 멈춤·재개가 없는 stateless 호출에 PostgreSQL 체크포인터를 붙이면 DB 쓰기·테이블 관리 비용만 늘어난다. 이때는 체크포인터 없이 그냥 invoke 한다.
  • 수십 초 내 끝나는 동기 승인. 사용자가 화면 앞에서 바로 누르는 즉시 승인이라면 인메모리(InMemorySaver)나 단순 요청-응답으로 충분하다. 영속화는 "분 단위 이상 대기" 또는 "프로세스 재시작 가능성"이 있을 때 가치가 생긴다.
  • 엄격한 분산 트랜잭션·SLA 기반 장기 워크플로. LangGraph 체크포인트는 "상태 스냅샷"이지 완전한 워크플로 엔진(트랜잭셔널 보장, 타이머, 재시도 정책, 시그널)이 아니다. 여러 서비스에 걸친 보상 트랜잭션, 며칠짜리 타이머, 정교한 재시도가 필요하면 Temporal 같은 전용 durable execution 엔진과 결합하거나 위임하는 편이 맞다.
  • 읽기 전용·조회성 에이전트. 부수효과가 없고 비용도 작다면 멈출 이유 자체가 없다.

핵심 판단 기준: "멈춰 있는 동안 상태가 사라지면 사고인가?" 가 Yes일 때만 PostgresSaver 영속화 + interrupt 조합을 도입한다.

9. 운영 팁: 프로덕션 체크리스트

  • .setup() 은 배포 단계에서 1회. 요청 경로에서 매번 호출하지 않는다. 마이그레이션 스텝이나 startup 훅에서 한 번만 실행한다.
  • 연결 풀 + autocommit=True + row_factory=dict_row. 비동기 서버는 AsyncConnectionPool 로 풀을 두고, 수동 연결에는 두 옵션을 반드시 넣는다(7절 박스 재확인). 풀 크기는 동시 처리량에 맞춘다.
  • thread_id 네이밍을 도메인 키와 묶기. refund-{order_id} 처럼 "하나의 비즈니스 흐름 = 하나의 thread_id"로 만들면 재개 매핑이 단순해지고 중복 실행도 식별하기 쉽다.
  • 승인 대기 경로는 durability="sync". 유실 비용이 큰 구간만 동기로, 나머지는 "async"/"exit" 로 차등.
  • get_state(config).next 로 대기 상태 가시화. 운영 대시보드에 "이 thread가 어느 노드에서, 무엇을 기다리는지"를 노출하면 미승인 적체를 추적할 수 있다.
  • 체크포인트 보관 정책. 완료된 thread는 delete_thread 로 정리하거나 보관 기간 정책을 둔다. 무한 누적은 테이블 비대화로 이어진다.
  • 재개 멱등성 회귀 테스트. "같은 Command(resume=...) 를 두 번 보내도 환불은 한 번만" 같은 시나리오를 e2e로 고정한다. 단위 테스트는 노드 재실행 함정을 못 잡는다.

10. 자주 묻는 질문 (FAQ)

interrupt()를 쓰려면 꼭 체크포인터가 필요한가요?

네. interrupt()는 그래프 상태를 영속화해 멈췄다가 이어가는 기능이라 체크포인터가 전제 조건입니다. 체크포인터 없이는 재개할 상태가 없어 동작하지 않습니다.

재개할 때 노드 전체가 다시 실행된다는 게 사실인가요?

사실입니다. Command(resume=...)로 재개하면 멈췄던 노드는 처음부터 재실행되고 interrupt() 호출 지점에서 resume 값을 반환합니다. 그래서 interrupt() 앞에 둔 LLM·API·DB 부수효과는 재개 때마다 반복됩니다. interrupt()를 노드 맨 앞에 두거나, 부수효과를 멱등하게 만들거나, @task로 감싸 해결합니다.

PostgresSaver와 AsyncPostgresSaver는 어떻게 고르나요?

동기 코드는 langgraph.checkpoint.postgres.PostgresSaver, asyncio 기반(FastAPI 등)은 langgraph.checkpoint.postgres.aio.AsyncPostgresSaver를 씁니다. 비동기 서버에서 동기 세이버를 쓰면 이벤트 루프를 막을 수 있으니 런타임에 맞추세요.

durability를 "sync"로만 두면 안 되나요?

동작은 합니다. 다만 모든 스텝마다 동기 쓰기가 발생해 처리량이 떨어집니다. 유실 비용이 큰 승인 대기 경로만 "sync", 나머지는 "async"/"exit"로 차등하는 편이 비용 효율적입니다.

수동 연결에서 autocommit/row_factory를 빼면 어떻게 되나요?

autocommit=True가 없으면 .setup()이 테이블 생성을 커밋하지 못해 다음 실행에서 테이블 없음 류 에러가 납니다. row_factory=dict_row가 없으면 내부의 딕셔너리 행 접근에서 런타임 에러가 납니다. from_conn_string()을 쓰면 자동 적용되지만 직접 만든 풀은 반드시 챙겨야 합니다.

승인을 기다리는 동안 사람이 값을 고치게 하려면요?

graph.get_state(config)로 현재 상태와 다음 노드(next)를 보여주고, graph.update_state(config, {...})로 값을 교정한 뒤 Command(resume=...)로 재개하면 됩니다. 교정된 상태 위에서 이어집니다.

같은 resume를 두 번 보내면 중복 처리되나요?

노드가 재실행되므로 부수효과가 멱등하지 않으면 중복될 수 있습니다. 외부 호출에 멱등키를 넘기고, 실행 노드 진입 시 이미 처리됨을 DB로 재확인하는 가드를 두세요.

언제 이 패턴 대신 Temporal 같은 워크플로 엔진을 써야 하나요?

여러 서비스에 걸친 보상 트랜잭션, 며칠짜리 타이머, 정교한 재시도 정책처럼 완전한 워크플로 오케스트레이션이 필요할 때입니다. LangGraph 체크포인트는 상태 스냅샷이지 분산 트랜잭션 엔진이 아니라서, 이 경우 전용 엔진과 결합하거나 위임하는 편이 맞습니다.