분산 서버 환경에서 스케줄링 작업 중복 실행 제어하기

반응형

필자의 프로젝트는 총 2개의 EC2 인스턴스가 존재하고 각 서버에서는 총 2개의 스케줄링(배치) 작업이 중복 실행된다.

 

아래는 모바일 Push 알림 전송을 위해 사용하는 FCM에서 topic 기능을 사용하는데

이 topic 개수 제한이 2,000개여서 주기적으로 topic을 해제할 필요가 있었다.

따라서 약속 시간이 지난 모임들을 새벽 4시 마다 스케줄링하여 약속 논리 삭제와 topic 제거를 배치 처리하고 있다.

아래의 update의 경우에는 중복 처리되어도 결과에 영향을 주진 않지만

모든 서버가 동일한 작업에 대해 불필요하게 DB 커넥션을 사용해 동일한 쿼리를 DB에 요청하고 FCM과 통신할 필요는 없다.

또한 아래 작업의 시간이 길어진다면 모든 서버들의 초기화 시간이 길어질 수 있다는 문제가 있다.

@Transactional
@Scheduled(cron = "0 0 4 * * *", zone = "Asia/Seoul")
public void scheduleOverdueMeetings() {
    meetingRepository.updateAllByNotOverdueMeetings();
    List<Meeting> meetings = meetingRepository.findAllByUpdatedTodayAndOverdue();
    log.info("약속 시간이 지난 약속들 overdue = true로 update 쿼리 실행");
    notificationService.unSubscribeTopic(meetings);
}

 

아래의 경우에는 서버 배포시에 애플리케이션이 종료되면서

TaskScheduler에 알림 전송이 예약된 작업들이 전송되지 않음을 방지하기 위해

알림마다 상태로 관리하여 전송되지 않은 알림들을 애플리케이션 실행 시 다시 TaskScheduler에 예약하는 작업이다.

이 경우에는 모든 서버에서 작업을 TaskScheduler에 예약하기 때문에 동일한 알림이 서버의 개수만큼 전송되는 문제가 있다.

@Transactional
@EventListener(ApplicationReadyEvent.class)
public void schedulePendingNotification() {
    List<Notification> notifications = notificationRepository.findAllByTypeAndStatus(
            NotificationType.DEPARTURE_REMINDER,
            NotificationStatus.PENDING
    );
    notifications.forEach(this::scheduleNotification);
    log.info("애플리케이션 시작 - PENDING 상태 출발 알림 {}개 스케줄링", notifications.size());
}

 

이러한 중복 스케줄링 문제를 해결하기 위해 필자는 Redisson을 사용한 분산락을 적용했다.

Redisson 분산락에 대한 내용은 아래 포스팅에서 다룬다.

2024.11.20 - [◼ Spring] - 분산락을 적용해 동시성 문제 해결하기

 

ShedLock 라이브러리를 사용해 다중 인스턴스에서 스케줄링 작업을 제어할 수도 있다.

ShedLock의 경우에는 네임드 락으로 인스턴스를 제어하는데 

필자는 기존에 Redis 인프라가 도입되어 ShedLock락 보다는 DB 부하를 줄이는 Redisson 분산락을 사용했다.

Redisson 분산락도 외부 인프라를 사용하는 것이고 이 또한 가용성을 보장하기 위해선 관리가 복잡할 수 있다.

각자의 프로젝트 환경에 맞추어 선택하도록 하자.


Redisson 클라이언트를 사용한 분산락으로 리더 선출하기

스케줄링 로직에 공통으로 리더 선출 로직을 적용하기 위해 AOP를 적용할 것이다.

이를 위해 다음과 같이 클래스를 생성했다.

포인트 컷 조건이 될 LeaderOnly 어노테이션
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LeaderOnly {

    @NotNull
    String key();

    long waitTime() default 1L;

    long leaseTime() default 3L;

    TimeUnit timeUnit() default TimeUnit.SECONDS;
}
리더 선출을 관리할 LeaderManager

분산락만으로는 중복 작업 실행 문제가 해결되지 않는다.

해당 작업을 실행할 리더를 선출해야한다.

필자는 분산락을 사용하여 리더를 선출하고 리더만 작업을 실행할 수 있도록 했다.

 

만약 위에서 분산락 포스팅을 읽고 왔다면 왜 트랜잭션 완료 후에 명시적인 락 해제가 없는지 궁금할 수 있다.

상황에 따라 다르다고 생각하는데 스케줄링의 경우에는

명시적으로 락을 해제하게 되면 작업이 밀리초 안에 끝나게 됐을 때 밀리초 차이로 동일 시간대에 중복 작업이 실행될 수 있다.

또 Lock을 획득한 한 서버만이 작업을 실행하기 때문에 데이터 정합성 문제가 발생하지 않을 것이기 때문이다.

@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonLeaderManager {

    private final RedissonClient redissonClient;
    private final String serverInstanceId = UUID.randomUUID().toString();

    public <T> T executeIfLeader(Supplier<T> supplier, LeaderOnly leaderOnly) {
        if (!isLeader(leaderOnly)) {
            log.info("서버 인스턴스 {}는 리더가 아니므로 작업을 실행하지 않습니다.", serverInstanceId);
            return null;
        }
        return supplier.get();
    }

    public boolean isLeader(LeaderOnly leaderOnly) {
        RLock lock = redissonClient.getLock(leaderOnly.key());
        if (lock.isHeldByCurrentThread()) {
            log.info("현재 인스턴스가 락 보유 중: {}", leaderOnly.key());
            return true;
        }
        return tryUpdateLeader(lock, leaderOnly);
    }

    private boolean tryUpdateLeader(RLock lock, LeaderOnly leaderOnly) {
        try {
            boolean acquired = lock.tryLock();
            if (acquired) {
                log.info("서버 인스턴스 {}가 리더로 선출되었습니다.", serverInstanceId);
                return true;
            }
            return false;
        } catch (Exception exception) {
            log.error("{} 리더 선출 과정에서 오류 발생", leaderOnly.key(), exception);
            throw new OdyServerErrorException("서버에 장애가 발생했습니다.");
        }
    }

    public void releaseLeadership(String lockName) {
        RLock lock = redissonClient.getLock(lockName);
        if (lock.isHeldByCurrentThread()) {
            try {
                lock.unlock();
                log.info("서버 인스턴스 {}가 리더 역할을 해제했습니다.", serverInstanceId);
            } catch (Exception exception) {
                log.error("리더십 해제 중 오류 발생", exception);
                throw new OdyServerErrorException("서버에 장애가 발생했습니다.");
            }
        }
    }
}

예외를 잡아서 커스텀 예외로 다시 던지는 과정이 있는데, 특정 로직에서 항상 체크 예외를 던진다.

따라서 예외 발생 시 트랜잭션이 롤백이 안되는 것을 방지하기 위해 "RuntimeException"을 상속한 커스텀 예외를 던지도록 했다.

 

어드바이스와 포인트컷을 정의한 LeaderOnlyAop 에스펙트
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class LeaderOnlyAop {

    private final RedissonLeaderManager redissonLeaderManager;

    @Around("@annotation(leaderOnly)")
    public Object executeIfLeader(ProceedingJoinPoint joinPoint, LeaderOnly leaderOnly) {

        return redissonLeaderManager.executeIfLeader(() -> {
            try {
                return joinPoint.proceed();
            } catch (OdyException exception) {
                throw exception;
            } catch (Throwable throwable) {
                log.error("리더 작업 처리중 에러 발생 : ", throwable);
                throw new OdyServerErrorException("서버에 장애가 발생했습니다.");
            }
        }, leaderOnly);
    }
}

예외를 2번 catch하고 있다.

첫번째 catch는 실제 실행하는 메서드에서 던지는 예외를 변환없이 그대로 던지도록 의도했고

두번째 catch는 proceed가 Throwable 체크 예외를 던지기 때문에 롤백을 위해 언체크 예외로 변환했다.


기존 스케줄링 메서드에 AOP 적용하기

적용은 간단하다 아래와 같이 Lock 키 이름을 정해주고 Lock 획득 대기 시간과 만료 시간을 설정해주면 된다.

설정하지 않는다면 어노테이션의 기본 값이 적용된다.

@Transactional
@LeaderOnly(key = "SCHEDULE_OVERDUE_MEETING", waitTime = 1, leaseTime = 3)
@Scheduled(cron = "0 0 4 * * *", zone = "Asia/Seoul")
public void scheduleOverdueMeetings() {
    meetingRepository.updateAllByNotOverdueMeetings();
    List<Meeting> meetings = meetingRepository.findAllByUpdatedTodayAndOverdue();
    log.info("약속 시간이 지난 약속들 overdue = true로 update 쿼리 실행");
    notificationService.unSubscribeTopic(meetings);
}
@Transactional
@LeaderOnly(key = "SCHEDULE_PENDING_MEETING", waitTime = 1L, leaseTime = 3L)
@EventListener(ApplicationReadyEvent.class)
public void schedulePendingNotification() {
    List<Notification> notifications = notificationRepository.findAllByTypeAndStatus(
            NotificationType.DEPARTURE_REMINDER,
            NotificationStatus.PENDING
    );
    notifications.forEach(this::scheduleNotification);
    log.info("애플리케이션 시작 - PENDING 상태 출발 알림 {}개 스케줄링", notifications.size());
}

 

대기 시간과 만료 시간을 1초, 3초로 설정한 이유는 다음과 같다.

현재 스케줄링들은 주기적인 스케줄링이 아닌 특정 시점에 한번만 실행된다.

따라서 리더가 선출되면 그 리더만 작업을 수행하고 팔로워들은 기다릴 필요가 없다.

0으로 하게 되면 즉시 시도하고 락이 있으면 실패하여 여러 인스턴스가 동시에 락을 시도했을 때

가장 빠르게 응답한 인스턴스가 작업을 수행하고 나머지는 기다리지 않으므로 효율적일 수 있다.

하지만 Redis의 일시적인 문제가 발생한다면 0초일 경우에 모든 인스턴스가 작업을 수행하지 못할 수 있는 최악의 상황을 고려해

이런 상황을 최소화하기 위해 대기 시간을 1초로 짧게 설정했다.

 

만료 시간은 작업이 끝나면 락을 해제하면 되므로 해당 작업 실행 시간에 맞춰 넉넉하게 주면 될 것 같다.

그렇다고 너무 타이트하게 잡으면 작업이 완료되기 전에 락이 해제되어

서버간 시간 차이가 클 경우 다른 인스턴스가 중복으로 작업을 수행할 수 있음으로 주의해야한다.

예측하기 어렵다면 leaseTime을 명시하지 않고 Redisson의 watchdog 매커니즘을 활용하는 것도 방법이다.

 

주의할점

주의할점은 1초 주기의 아주 빠른 스케줄링을 사용할 때 발생할 수 있다.

// 애플리케이션 1
2025-03-20 18:19:28.009 [INFO] [scheduling-1] [] [c.o.c.redis.RedissonLeaderManager] - 현재 인스턴스가 락 보유 중: SCHEDULE_OVERDUE_MEETING
2025-03-20 18:19:28.012 [INFO] [scheduling-1] [] [c.ody.meeting.service.MeetingService] - 약속 시간이 지난 약속들 overdue = true로 update 쿼리 실행

// 애플리케이션 2
2025-03-20 18:19:28.025 [INFO] [scheduling-1] [] [c.o.c.redis.RedissonLeaderManager] - 서버 인스턴스 6b130a69-d23c-430f-9ddd-a9263387a6ff가 리더로 선출되었습니다.
2025-03-20 18:19:28.029 [INFO] [scheduling-1] [] [c.ody.meeting.service.MeetingService] - 약속 시간이 지난 약속들 overdue = true로 update 쿼리 실행

만료 시간 3초 기준으로 애플리케이션 1의 락 leaseTime이 "19분 28초 012밀리초"라면

만료 직후 애플리케이션 2의 스케줄링이 시작되는 밀리초의 차이로 "19분 28초 025밀리초"에 락을 획득해

중복으로 작업을 수행할 수 있다.

 

이렇게 짧은 주기의 스케줄링이 필요하다면 

밀리초까지의 서버 시간 동기화를 위한 NTP 서버를 구축하거나 더 높은 정밀도를 원한다면 PTP 서버를 구축할 필요가 생길 수 있다.

만약 스케줄링의 작업이 많다면 스케줄링 전용 서버를 하나 구축하는 것도 하나의 방법이 될 수 있다.


리더 선출 테스트하기 

리더 선출 매커니즘이 잘 동작하는지 테스트해보자.

테스트를 하기 전에 Redis 연결이 필요하다.

필자는 테스트와 생명주기가 동일하게 실행되는 테스트 컨테이너를 사용했다.

2024.11.14 - [◼ Spring] - [Spring] TestContainers로 Redis 테스트하기

 

테스트용 LeaderOnly 어노테이션

우선 어노테이션을 파라미터 인자로 넘기기 때문에 아래와 같이 테스트용 어노테이션을 생성했다.

public record MockLeaderOnly(String key, long waitTime, long leaseTime, TimeUnit timeUnit) implements LeaderOnly {

    @Override
    public Class<? extends Annotation> annotationType() {
        return LeaderOnly.class;
    }
}

 

LeaderManager 테스트

LeaderManager이 의도한 데로 잘 동작하는지 확인하기 위해 다음과 같은 상황을 테스트하여 검증했다.

  1. 2개의 스레드가 동시에 리더가 되려는 상황
  2. 리더가 리더쉽을 해제할 경우 팔로우가 리더가 되는 상황
class RedissonLeaderManagerTest extends BaseServiceTest {

    @Autowired
    private RedissonClient redissonClient;

    @DisplayName("여러 인스턴스가 동시에 리더쉽을 얻으려고 하더라도 하나의 인스턴스만 리더가 된다.")
    @Test
    void electLeader() throws InterruptedException {
        MockLeaderOnly mockLeaderOnly = new MockLeaderOnly("ELECT_LEADER", 0, 3, TimeUnit.SECONDS);
        int threadCount = 2;
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        AtomicInteger leaderCount = new AtomicInteger(0);

        for (int i = 1; i <= threadCount; i++) {
            executorService.execute(() -> {
                try {
                    RedissonLeaderManager manager = new RedissonLeaderManager(redissonClient);
                    if (manager.isLeader(mockLeaderOnly)) {
                        leaderCount.incrementAndGet();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await(3, TimeUnit.SECONDS);
        executorService.shutdown();
        executorService.awaitTermination(3, TimeUnit.SECONDS);

        assertThat(leaderCount.get()).isEqualTo(1);
    }

    @DisplayName("리더가 리더쉽 해제 시 팔로워가 새로운 리더가 된다.")
    @Test
    void electNewLeader() throws InterruptedException {
        MockLeaderOnly mockLeaderOnly = new MockLeaderOnly("NEW_LEADER", 0, 3, TimeUnit.SECONDS);
        CountDownLatch leaderLatch = new CountDownLatch(1);
        CountDownLatch followerLatch = new CountDownLatch(1);
        CountDownLatch completionLatch = new CountDownLatch(1);
        AtomicBoolean isFollowerInitialLeader = new AtomicBoolean();
        AtomicBoolean isFollowerLastLeader = new AtomicBoolean();

        Thread leaderThread = new Thread(() -> {
            try {
                RedissonLeaderManager initialLeader = new RedissonLeaderManager(redissonClient);
                initialLeader.isLeader(mockLeaderOnly); // 최초 리더로 선출
                leaderLatch.countDown(); // 팔로워 스레드가 시작할 수 있도록 알림
                followerLatch.await(); // 팔로워 스레드가 리더십 확인을 마칠 때까지 대기
                initialLeader.releaseLeadership(mockLeaderOnly.key());
                completionLatch.countDown(); // 테스트가 완료됨을 알림
            } catch (Exception exception) {
                throw new RuntimeException(exception);
            }
        });

        Thread followerThread = new Thread(() -> {
            try {
                leaderLatch.await(); // 리더 스레드가 리더십을 획득할 때까지 대기
                RedissonLeaderManager follower = new RedissonLeaderManager(redissonClient);
                isFollowerInitialLeader.set(follower.isLeader(mockLeaderOnly));
                followerLatch.countDown(); // 리더 스레드에게 리더십 확인을 마쳤다고 알림
                completionLatch.await(); // 리더십 해제 및 테스트 완료를 기다림
                isFollowerLastLeader.set(follower.isLeader(mockLeaderOnly));
            } catch (Exception exception) {
                throw new RuntimeException(exception);
            }
        });

        // 스레드 실행
        leaderThread.start();
        followerThread.start();

        // 모든 스레드 완료 대기
        leaderThread.join();
        followerThread.join();

        assertThat(isFollowerInitialLeader.get()).isFalse();
        assertThat(isFollowerLastLeader.get()).isTrue();
    }
}

1번째 테스트의 경우에는 인스턴스 별로 고유한 UUID를 갖기 때문에 위와 같이 생성하였고

executorService로 2개의 스레드가 동시에 리더가 되려하는 상황을 테스트했다.

 

2번째 테스트의 경우 조금 복잡한데

Redisson 클라이언트를 사용해 Redis에 연결하는 스레드 별로 Lock을 획득하기 때문에

각각 별도의 스레드로 가정하여 테스트할 필요가 있었다.

하지만 동시성 테스트가 아닌 리더의 리더쉽 해제로 팔로워가 리더가 되는지에 대한 순차적인 과정을 확인하고 싶어

ExecutorService로 스레드 별 테스트를 하지 않고 각각의 스레드를 생성해 작업을 정의하여 테스트 했다.

 

LeaderOnly AOP를 적용한 메서드 테스트

만약 스케줄링되는 메서드가 정확히 한번 실행됐는지 확인하고 싶다면 아래 처럼 테스트할 수 있을 것이다.

... 생략

    @MockBean
    private MeetingRepository meetingRepository;
    
    @DisplayName("모임 논리 삭제 스케줄링 작업이 동시 실행 시 하나의 리더 인스턴스에서만 작업이 실행된다.")
    @Test
    void 스케줄링은_하나만() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicInteger executionCount = new AtomicInteger(0);

        Mockito.doAnswer(invocation -> {
            executionCount.incrementAndGet();
            return null;
        }).when(meetingRepository).updateAllByNotOverdueMeetings();

        for (int i = 1; i <= 2; i++) {
            executorService.execute(() -> {
                try {
                    meetingService.scheduleOverdueMeetings();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await(3, TimeUnit.SECONDS);
        executorService.shutdown();
        executorService.awaitTermination(3, TimeUnit.SECONDS);

        assertThat(executionCount.get()).isEqualTo(1);
    }
    
... 생략

애플리케이션 로그로 AOP 적용 여부 확인하기

포트가 다른 2개의 애플리케이션을 실행하고 스케줄링 작업을 2초로 변경하여 로그를 출력했다.

락을 획득한 리더 애플리케이션만 스케줄링 작업을 실행하고 팔로워는 실행하지 않는다.

그리고 리더가 리더쉽을 잃으면 팔로워가 리더로 선출되는 것을 볼 수 있다.