본문 바로가기
개발

promise pool로 비동기 작업 처리량 높이기

by 박연호의 개발 블로그 2025. 4. 3.

Promise 관련된 내용을 찾아보다가 @supercharge/promise-pool 이라는 라이브러리를 발견하여, 기존 Promise.all와 비교하여 어떤 부분에서 장점이 있는지 코드를 분석해보겠습니다.

 

보통 서버에서 여러 비동기 작업을 한번에 처리하거나, 처리량을 높이기 위해 Promise.all을 사용합니다.

await Promise.all(["1초 걸리는 작업", "3초 걸리는 작업", "2초 걸리는 작업"])
await Promise.all(["4초 걸리는 작업", "2초 걸리는 작업", "5초 걸리는 작업"])

 

각각의 Promise.all의 실행되는 시간은 Promise.all에서 가장 처리시간이 오래 걸리는 작업이 됩니다. 위의 코드예시에서 첫번째 Promise.all은 3초, 두번째 Promise.all은 5초로 총 8초가 소요됩니다.

 

 총 6개의 Promise를 처리하는데 8초가 소요되었는데, 사실 첫번째 Promise.all에서 1초가 소요되는 작업은 이미 작업을 끝냈습니다. 단지 가장 오래 걸리는 작업인 3초 걸리는 작업이 마무리되길 기다리고 있을 뿐입니다. 이말은 즉, 더 많은 작업을 처리할 수 있지만 Promise.all의 함수 특성때문에 다른 작업을 처리하지 못하고 있는 것입니다.

 

첫번째 Promise.all에서 1초, 2초 걸리는 작업이 마무리되면 2번째 Promise.all에서 실행될 Promise 처리하면 8초보다 더 일찍 끝날 것 같습니다.

 

워터파크에 슬라이드 라인이 5개가 있다면, 5명이 모두 내려간 이후에 다시 5명을 한번에 내려보내는 것보단 먼저 내려간 라인은 다른 라인의 사람의 도착 여부와 상관없이 바로 사람을 태우는 것이 훨씬 더 많은 사람이 슬라이드를 즐길 수 있습니다.

 

슬라이드 라인과 마찬가지로 5개의 라인(promise pool)를 만들어 n번째 라인의 사람이 내려갔다면(Promise 처리를 했으면) 다른 라인을 기다리지 않고 n번째 라인에 사람을 태우는 방법이 @supercharge/promise-pool 의 아이디어 이며 동작하는 방식입니다.

 


Promise.all vs Promise pool 실행시간 비교

 

0.5 ~ 2초 사이의 랜덤한 시간만큼 sleep 하도록 하여 성능 비교

/**
 * 1000개의 배열에 500 ~ 2000사이의 값을 삽입
 */
const generateMs = () => {
  return Array.from(
    { length: 1000 },
    () => Math.floor(Math.random() * (2000 - 500 + 1)) + 500
  );
};

const sleep = (ms: number) => {
  return new Promise((res, rej) => {
    setTimeout(() => {
      res(true);
    }, ms);
  });
};

/**
 * 배열을 size만큼 slice
 * [1,2,3,4,5,6] -> [[1,2],[3,4],[5,6]]
 */
function chunkArray<T>(array: T[], size: number): T[][] {
  const result: T[][] = [];
  for (let i = 0; i < array.length; i += size) {
    result.push(array.slice(i, i + size));
  }
  return result;
}

const promiseAll = async () => {
  const randomMs = chunkArray(generateMs(), 100);
  console.time("promiseAll timer");

  for (const msArray of randomMs) {
    await Promise.all(msArray.map((v) => sleep(v)));
  }

  console.timeEnd("promiseAll timer");
};

const promisePool = async () => {
  console.time("promisePool timer");
  await PromisePool.for(generateMs()).withConcurrency(100).process(sleep);
  console.timeEnd("promisePool timer");
};

(async () => {
  await promiseAll();
  await promisePool();
})();

 

 

 

0.5초~2초 사이의 값으로 1000길이의 배열을 초기화 함. Promise함수는 0.5~2초 사이의 값을 받아서 setTimeout하는 함수로 작성.

 

Promise.all은 배열을 100개 단위로 잘라서 총 10번을 실행함

promise pool은 동시에 실행되는 Promise를 100개로 제한

 

결과는 Proise.all은 19s, promise pool은 13초로 promise pool을 사용했을 때 성능이 31% 개선된 것을 알 수 있습니다.


@supercharge/promise-pool 코드 뜯어보기

 

이제부터는 @supercharge/promise-pool이 어떤식으로 구현되어있는지 코드를 뜯어 보겠습니다.

 

코드를 이해하기 전에 선행학습으로 Promise.race 메서드는 가장 빨리 resolve한 Promise값을 반환합니다. 하지만, 중요한 점은 첫번째 이후의 Promise은 첫번째에만 못들었을 뿐이지 결국에는 resolve 된다는 것입니다.

 

const sleep = (ms: number) => {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(ms);
      console.log(ms);
    }, ms);
  });
};

(async () => {
  Promise.race([sleep(1000), sleep(2000), sleep(3000)]).then((v) => {
    console.log("결과 : ", v);
  });
})();

////
1000
결과 :  1000
2000
3000

 

위 코드에서 결과값을 보면 2,3초 Promise는 결과값에는 출력되지 않았지만 결국에는 resolve함수가 실행됩니다. 이점을 생각하고 코드를 분석하면 더 이해가 잘됩니다.

 

promise pool의 시작점은 pool에서 실행할 promise 함수를 process 메서드에 전달하면서 시작합니다.

 

 

 

msArray : 500 ~ 2000사이의 값이 저장되어있는 배열

100 : 동시에 실행한 Promise의 개수(워터파크 슬라이드 라인 수)

sleep : 500 ~ 2000값을 받아 sleep하는 함수

 

 

process 메서드에서는 이런저런 초기값을 설정하고 start 메서드를 호출합니다. callback함수는 위에서 전달받은 sleep함수로, PromisePool에서 실제로 실행된 Promise 함수입니다.

 

start 함수에서는 process() 메서드를 실행합니다.

 

this.item()의 반환값은 앞서 초기화한 500~2000사이의 값입니다. 이 값을 루프 돌면서 어떤 처리를 한 후 마지막에 this.drained()를 호출하면서 종료됩니다. 이게 전부 입니다.

 

루프안에서 중요한 메서드는 this.startProcessing()인데, 500~2000사이의 값 하나와 index를 전달하고 있습니다.

 

startProcessing에서는 첫번째로 this.createTaksFor()를 호출하여 Promise를 반환받는데, 이 Promise는 앞서 PromisePool.process(sleep)에서 전달한 sleep 함수입니다. createTaksFor 호출 시 전달한 item(500~2000사이의 값)이 sleep의 함수로 들어가게 됩니다. 결국에 task값은 특정 시간동안 sleep하는 Promise 입니다.

 

이렇게 생성한 Promise는 this.task()에서 반환한 배열에 넣습니다. 이 배열의 역할을 "현재 실행중인 Promise 작업들의 목록" 입니다. 

Promise를 생성했으니 동시에 몇개를 관리할 지 판단하기 위해 배열에 저장해야 합니다.

 

여기서 가장 중요한 점은, Promise의 then, catch 메서드 내부에서 호출하는 removeActive입니다. 

 

removeActive에서는 "현재 실행중인 Promise 작업들의 목록"에서 현재 Promise를 제거하고 있습니다. 즉 Promise가 완료/오류가 발생했으면 결국엔 마무리 된것이기 때문에 다음 Promise에게 턴을 줘야하기 때문에 제거하고 있습니다. 워터파크에서 n번째 라인을 다 탔으면 자리를 비워주고 다음 사람이 탈 수 있게 턴을 넘겨줘야 합니다.

 

 

지금까지 500~2000사이의 값 루프를 돌면서 Promise를 만들고 배열에 넣고 있고, 해당 Promise가 완료되면 배열에 제거해주는 로직까지 알아봤습니다.  지금은 준비만 한 것 뿐이지, 실제로 Pool을 제어할 수 있어야 합니다. 

 

위 코드가 그 역할을 하며, 동시에 처리할 수 있는 Promise를 제어하면서 Promise의 작업을 완료/에러 처리하여 "현재 실행중인 Promise 작업들의 목록"에서 제거해줘야 합니다. 그래야 다른 Promise이 처리되기 때문이죠.

 

 

앞서 루프를 돌면서 Promise를 만들고 "현재 실행중인 Promise 작업들의 목록"에 계속 push하는데 계속 넣는것이 아니라 pool size만큼 push합니다. pool size는 앞서 PromisePool을 초기화할 때 withConcurrency로 초기화 했습니다.

 

pool size가 100개라면 98, 99, 100까지 채우고 101이 되는 순간 이제는 비워줘야 합니다. 

 

101이 되는순간 "현재 실행중인 Promise 작업들의 목록"을 출발시킵니다. race 메서드이기 때문에 가장 빨리 마무리되는 작업이 마무리되고 그 다음 반복문이 진행됩니다. 이후 반복문이 진행되면 앞서 진행한 Promise만들고,  "현재 실행중인 Promise 작업들의 목록"에 넣고 while문에서 대기하고 ...과정을 반복합니다.

 

여기서 가장 중요한 부분은, race메서드에서 가장 첫번째로 완료한 그리고 그 이후의 Promise 모두  then, catch내부에서  removeActive가 실행됩니다. 앞서 removeActive의 역할은 해당 Promise를 "현재 실행중인 Promise 작업들의 목록"에서 제거하는 것입니다. 이로써 목록에 자리가 생기고, 위의 while문의 조건절에 영향을 주어 "현재 실행중인 Promise 작업들의 목록"에 공석이 생깁니다.

 

for문을 돌면서 새로 생성한 Promise들은 이 공석에 들어가게 되고, "현재 실행중인 Promise 작업들의 목록"의 길이가 100을 넘어가게 되면 또 다시 race를 하게 됩니다. 이 과정을 반복하면서 동시에 실행할 수 있는 Promise가 팽팽하게 유지되면서 하나의 Promise가 마무리되면 다른 Promise가 곧바로 처리되게 됩니다.

 


Promise pool이라는 컨셉이 신기해서 코드를 보았는데, Promise.race와 then, catch 메서드를 사용하여 제어한 것이 흥미로웠습니다. Promise들의 처리 시간이 많이 차이나는 경우 사용하면 좋겠지만, 처리 시간이 비슷한 경우는 Promise.all을 사용해도 문제가 없겠다고 생각합니다.