Programming/python

Python asyncio.as_completed를 활용한 Markdown 문서 병렬 청킹

moxie2ks 2026. 3. 13. 18:01
728x90
반응형

개요

여러 파일을 순차적으로 처리하면 처리 시간이 파일 수에 비례해 증가한다. asyncio를 활용하면 I/O 대기 구간을 겹쳐 실행함으로써 전체 처리 시간을 단축할 수 있다. 본 글에서는 async defasyncio.as_completed를 사용해 여러 Markdown 규정 문서를 병렬로 읽고 헤더 기반으로 청킹 하는 패턴을 분석한다. 핵심 라이브러리는 LangChain의 MarkdownHeaderTextSplitter이며, 비동기 흐름의 설계 방식과 각 구성 요소의 역할을 중심으로 설명한다.

설명

전체 구조

코드는 크게 세 가지 역할로 구분된다.

1. 청킹 로직 (_split, chunk_markdown)

_split은 순수한 동기 함수로, MarkdownHeaderTextSplitter를 초기화하고 텍스트를 헤더 레벨(#, ##, ###)에 따라 분리한다. strip_headers=False로 설정하면 각 청크 내용에 헤더 텍스트가 그대로 포함된다.

chunk_markdownasync def로 선언된 코루틴이다. 파일 읽기와 동기 함수 호출을 asyncio.to_thread로 감싸 이벤트 루프를 블로킹하지 않도록 처리한다.

async def chunk_markdown(name: str, path: Path) -> tuple[str, list]:
    text = path.read_text(encoding="utf-8")
    chunks = await asyncio.to_thread(_split, text)
    return name, chunks

2. 병렬 실행 (asyncio.as_completed)

main 함수에서는 각 소스에 대한 코루틴 객체를 리스트로 생성한 뒤, asyncio.as_completed에 전달한다. 이 함수는 완료된 태스크부터 순서에 관계없이 결과를 반환하는 이터레이터를 제공한다.

tasks = [chunk_markdown(name, path) for name, path in SOURCES.items()]
for coro in asyncio.as_completed(tasks):
    name, chunks = await coro

3. 결과 출력 (print_chunks)

각 청크의 메타데이터(title, chapter, article)와 본문 내용을 구조적으로 출력한다. 이 함수는 순수 동기 함수로, 비동기 흐름과 분리되어 있다.

헤더 기반 청킹

HEADERS_TO_SPLIT_ON은 헤더 레벨과 메타데이터 키를 매핑한 리스트다.

HEADERS_TO_SPLIT_ON = [
    ("#", "title"),
    ("##", "chapter"),
    ("###", "article"),
]

이 설정에 따라 MarkdownHeaderTextSplitter는 헤더를 기준으로 문서를 분할하고, 각 청크의 metadata에 해당 헤더 텍스트를 저장한다. 이를 통해 청크만으로도 해당 내용이 어느 제목, 장, 조에 속하는지 파악할 수 있다.

특징

asyncio.as_completed vs asyncio.gather

두 함수는 모두 여러 코루틴을 병렬 실행하지만 동작 방식이 다르다.

항목 asyncio.gather asyncio.as_completed
결과 순서 입력 순서 보장 완료 순서 기준
결과 수신 시점 모든 태스크 완료 후 태스크별 완료 즉시
부분 실패 처리 return_exceptions=True 필요 루프 내 개별 처리 가능
스트리밍 처리 불가 가능

파일 크기나 처리 시간이 태스크마다 다를 때, asyncio.as_completed는 빠른 태스크의 결과를 즉시 소비할 수 있어 응답성이 높다.

asyncio.to_thread의 역할

MarkdownHeaderTextSplitter.split_text는 동기 함수다. 이를 직접 await할 수 없으므로, asyncio.to_thread를 사용해 별도의 스레드 풀에서 실행한다. 이 방식은 CPU 바운드 작업이나 동기 I/O 작업이 이벤트 루프를 블로킹하는 문제를 방지한다.

chunks = await asyncio.to_thread(_split, text)

메타데이터 보존의 이점

청크마다 title, chapter, article 메타데이터가 붙어 있어, 이후 Vector Store 저장 시 필터링 조건으로 활용할 수 있다. 예를 들어, 특정 장(chapter)에 해당하는 청크만 검색하거나, 조(article) 단위로 맥락을 구성하는 GraphRAG 응용에도 적합하다.

예제

실행 결과 예시

인사규정 Markdown 파일이 다음과 같은 구조라고 가정한다.

# 인사규정

## 제1장 총칙

### 제1조 (목적)
이 규정은 직원의 인사에 관한 사항을 정함을 목적으로 한다.

### 제2조 (적용 범위)
이 규정은 전 임직원에게 적용된다.

실행 결과는 다음과 같다.

====================================================================================================
[인사규정]  총 청크 수: 3

── CHUNK #1 ────────────────────────────────────────────────────────────────────────────
title   : 인사규정
chapter : 제1장 총칙
article : 제1조 (목적)
content :
### 제1조 (목적)
이 규정은 직원의 인사에 관한 사항을 정함을 목적으로 한다.

── CHUNK #2 ────────────────────────────────────────────────────────────────────────────
title   : 인사규정
chapter : 제1장 총칙
article : 제2조 (적용 범위)
content :
### 제2조 (적용 범위)
이 규정은 전 임직원에게 적용된다.

확장 패턴: 청킹 결과를 Vector Store에 저장

chunk_markdown의 반환값을 Vector Store 저장 파이프라인에 바로 연결할 수 있다.

async def chunk_and_store(name: str, path: Path, vectorstore) -> None:
    _, chunks = await chunk_markdown(name, path)
    docs = [
        Document(page_content=c.page_content, metadata={**c.metadata, "source": name})
        for c in chunks
    ]
    await asyncio.to_thread(vectorstore.add_documents, docs)

결론

async defasyncio.as_completed의 조합은 다수의 파일을 대상으로 하는 청킹 파이프라인에서 처리 순서에 의존하지 않고 완료된 작업부터 즉시 소비할 수 있는 구조를 제공한다. asyncio.to_thread를 통해 동기 라이브러리도 이벤트 루프 차단 없이 통합할 수 있으며, MarkdownHeaderTextSplitter의 메타데이터 보존 방식은 이후 RAG 파이프라인에서의 필터링과 컨텍스트 구성에 실질적인 이점을 제공한다. 단순한 병렬 처리 수준을 넘어, 구조적 문서 처리와 비동기 설계를 결합하는 기본 패턴으로 활용할 수 있다.

참고문헌

728x90
반응형