본문 바로가기
Spring

[Spring] SpringWebFlux에서의 효과적 Error처리와 백프레셔(BackPressure)

by 곰민 2023. 4. 1.
728x90

Reactive Programming에서 백프레셔(Backpressure) 처리 전략을 이해하고, Spring WebFlux를 활용하여 구현하는 방법을 알아보도록 합니다.
Project Reactor의 예제 코드를 활용하여 Publisher 와 Consumer 측에서의 백프레셔(Backpressure) 전략을 확인해 봅니다.

 

 

traditional try-catch


traditional 한 명령형 프로그래밍에서 오류 처리는 일반적으로 try-catch 블록을 통해 처리됩니다.

 

try {
	// 예외를 발생시킬 수 있는 작업 수행
    int result = performOperation();
    System.out.println("Result: " + result);
} catch (ArithmeticException e) {
	System.out.println("산술 오류가 발생했습니다: " + e.getMessage());
} catch (Exception e) {
	System.out.println("예기치 않은 오류가 발생했습니다: " + e.getMessage());
}

 

실행 흐름이 synchronous 하고 blocking 되기 때문에 명령형 프로그래밍에서는 backpressure가 그다지 중요하지 않습니다.
Producer와 Consumer가 직접 소통하고 Consumer는 일반적으로 Producer의 속도를 따라갈 수 있습니다.

 

Reactive Project Backpressure


반면, 반응형 시스템에서는 오류 처리와 백프레셔 관리가 안정적이고 반응성 있는 애플리케이션을 보장하는 데 중요합니다.

 

반응형 스트림 처리는 여러 연산자를 연결하는 과정이 포함되어 있으며, 스트림의 어떤 부분에서든 오류가 발생하면 전체 스트림이 실패할 수 있습니다.

 

이에 따라 반응형 시스템에서는 오류 처리를 효과적으로 처리하는 것이 중요합니다.

 

Spring WebFlux에서는 Project Reactor와 RxJava와 같은 반응형 프로그래밍 라이브러리를 사용하여 오류를 효과적으로 처리하기 위한 여러 전략과 연산자를 제공합니다.

 

이러한 연산자는 다양한 오류 처리 전략을 제공하며, 안정적이고 반응성 있는 애플리케이션을 구현하는 데 도움이 됩니다.

 

Error Handling Operator


onErrorReturn

 

onErrorReturn 연산자를 사용하면 stream에서 오류가 발생할 시 대체 값을 제공할 수 있습니다.

stream을 terminates 하고 fallback 값을 반환합니다.

 

Flux<String> flux = Flux.just("A", "B", "C", "D")
    .map(s -> {
        if (s.equals("C")) {
            throw new RuntimeException("An error occurred!");
        }
        return s.toLowerCase();
    })
    .onErrorReturn("error");

flux.subscribe(System.out::println);

 

onErrorResume

 

onErrorReturn과 유사하게 onErrorResume 연산자를 사용하면 오류가 발생할 때 fallback stream을 제공할 수 있습니다.

error는 fallback stream 내에 추가로 전파되거나 해결될 수 있습니다.

 

Flux<String> flux = Flux.just("A", "B", "C", "D")
    .map(s -> {
        if (s.equals("C")) {
            throw new RuntimeException("An error occurred!");
        }
        return s.toLowerCase();
    })
    .onErrorResume(e -> Flux.just("error"));

flux.subscribe(System.out::println);

 

retry

 

retry 연산자를 사용하면 error 발생시 stream을 다시 시도할 수 있습니다.

retry 횟수를 지정하고 retry 사이에 delay도 넣을 수 있습니다.

 

Flux<String> flux = Flux.just("A", "B", "C", "D")
    .map(s -> {
        if (s.equals("C")) {
            throw new RuntimeException("An error occurred!");
        }
        return s.toLowerCase();
    })
    .retry(3);

flux.subscribe(System.out::println);

 

Reactive Programming에서의 BackPressure


Reactive Programming의 non-blocking 특성으로 인해 서버는 전체 스트림을 한 번에 보내지 않습니다.

가능한 한 빨리 데이터가 제공되면 동시에 데이터를 전송할 수 있습니다.

따라서 클라이언트는 이벤트를 수신하고 처리하는 데 기다리는 시간이 적어집니다.

그러나 이 는 극복해야 할 문제가 있습니다.

즉 data publisher는 subscriber가 처리할 수 없을 정도의 속도로 데이터를 보내 receiver를 압도해 버릴 수 있습니다.

 

위 예시와 같이 아래처럼 진행이 된다면.
Publisher는 초당 10000 개의 이벤트를 Consumer에 보냅니다.

Consumer는 이를 처리하고 결과를 GUI에 전송합니다.
Consumer는 초당 7500 개의 이벤트만 처리할 수 있습니다.

그만.. 보내

 

 

이런 구조에서서는 Consumer가 이벤트를 처리하지 못하기 때문에 시스템이 중단됩니다.

 

Backpressure를 사용해서 Systemic Failures를 예방하기 위한 세 가지 전략


 

시스템적 실패를 방지하기 위해서는 크게 세 가지가 존재합니다.

 

Plan 1

Data Stream을 제어하기

 

Plan 2

추가적으로 들어오는 데이터의 양을 Buffering 하는 것

 

Plan 3

추가로 들어오는 이벤트를 삭제하고 Tracking 하지 않는 것.

 

Consumer 측에서 확인하는 세 가지 전략


Project Reactor에 대한 점은 아래 포스팅을 참조해주시면 됩니다.

[Spring] Project Reactor EventLoop와 Flux와 Mono. (tistory.com)

 

[Spring] Project Reactor EventLoop와 Flux와 Mono.

Project Reactor는 JVM(Java Virtual Machine)에서 비동기 및 반응형 애플리케이션을 구축하기 위해 Reactive Streams를 활용하는 라이브러리입니다. 주요 publisher인 Mono와 Flux와 함께 다양한 연산자들을 제공하

colevelup.tistory.com

 

Project Reactor는 Flux functionalities를 내부적으로 사용하여 이벤트 발생자가 생성하는 이벤트를 제어하는 메커니즘을 적용합니다.

 

Controlling the Data Stream(Plan 1)

말 그대로 producer로 가서 event를 보내는 속도를 낮춥니다.

producer의 event 보내는 속도를 낮추는 방법입니다.

 

public class ThrottlingPublisherExample {
    public static void main(String[] args) {
        Flux<Long> originalProducer = Flux.interval(Duration.ofNanos(1000000000 / 10000))
                                          .doOnNext(item -> System.out.println("Original Emitted: " + item));

        Flux<Long> throttledProducer = originalProducer.sample(Duration.ofNanos(1000000000 / 7500))
                                            .doOnNext(item -> System.out.println("Throttled Emitted: " + item));

        throttledProducer.subscribe();
    }
}

 

Sample (Plan 1)

Publisher에서 Consumer에게 데이터가 전송되는 속도를 제어하는 데 도움이 됩니다.

Sample operator는 주기적으로 Publisher로부터 최신 항목을 내보냅니다.

즉, 모든 항목이 Consumer 에게 전달되는 것이 아니라 sampling 시점의 가장 최근 항목만 소비자에게 전달됩니다.

 

public class SampleStrategyExample {
    public static void main(String[] args) {
        Flux<Long> producer = Flux.interval(Duration.ofMillis(10)); // Emitting data every 10 milliseconds

        producer
            .sample(Duration.ofMillis(100)) // Sampling data every 100 milliseconds
            .subscribe(value -> {
                System.out.println("Received: " + value);
            });
    }
}

Publisher는 10밀리 초마다 데이터를 전송합니다.
그러나 Sample Operator는 100밀리 초마다 최신 항목을 전송하는 데 사용됩니다.
결과적으로 Consumer는 더 느린 속도로 데이터를 수신하여 과부하를 방지할 수 있습니다.

 

BackpressureBuffer(Plan 2)

데이터 스트림을 제어할 수 없는 경우 여분의 데이터를 버퍼링 하는 것도 또 다른 옵션입니다.
이 접근 방식을 사용하면 consumer는 이벤트를 처리할 수 있을 때까지 나머지 이벤트를 임시로 저장합니다.

그러나 무제한 버퍼는 메모리 문제를 일으킬 수 있습니다.

project reactor는 이 전략을 구현하기 위한 버퍼 연산자를 제공합니다.

 

Flux<Integer> producer = Flux.range(1, 100);

producer
    .onBackpressureBuffer(10)
    .subscribe(value -> {
        try {
            Thread.sleep(100); // Simulating a slow consumer
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Received: " + value);
    });

 

위 예제 코드는 버퍼 크기가 10인 버퍼 연산자를 사용합니다. 
consumer는 항목을 일괄 처리하므로 백압을 관리하는 데 도움이 될 수 있습니다.

 

BackpressureDrop(Plan 3)

Plan1, Plan2 가 적합하지 않은 경우 추가 이벤트를 삭제하는 것이 대안이 될 수 있습니다. 

사실 제일 이상적이지 않지만 시스템이 다운되는 것을 막을 수는 있습니다.

 

public class DroppingExample {
    public static void main(String[] args) {
        Flux<Integer> producer = Flux.range(1, 100);

        producer.onBackpressureDrop()
                .subscribe(item -> {
                    System.out.println("Received: " + item);
                    // Simulate slow consumer
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
    }
}

 

onBackpressureLatest(Plan 3)

 Consumer가 처리할 수 없는 과도한 이벤트를 삭제하고 가장 최근의 이벤트만 유지합니다. 
이를 통해 Consumer가 데이터에 압도당하는 것을 방지하는 동시에 최신 정보를 수신할 수 있습니다.

 

public class OnBackpressureLatestExample {
    public static void main(String[] args) throws InterruptedException {
        Flux<Long> producer = Flux.interval(Duration.ofMillis(10))
                                  .onBackpressureLatest()
                                  .doOnNext(item -> System.out.println("Produced: " + item));

        producer.publishOn(Schedulers.parallel())
                .doOnNext(item -> {
                    try {
                        // Simulate slow consumer by delaying the processing
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Consumed: " + item);
                })
                .subscribe();

        // Keep the main thread running
        Thread.sleep(5000);
    }
}

 

FlatMap

Publisher의 항목을 내부 Publisher로 변환한 다음 그 출력을 Merge 하여 데이터가 소비되는 속도를 제어하는 데 도움이 됩니다.
FlatMap 연산자는 동시성 수준을 제어하도록 구성할 수 있어 백프레셔를 효과적으로 관리할 수 있습니다.

 

public class FlatMapStrategyExample {
    public static void main(String[] args) {
        Flux<Integer> producer = Flux.range(1, 100);

        producer
            .flatMap(value -> processValue(value), 4) // Limiting concurrency to 4
            .subscribe(value -> {
                System.out.println("Processed: " + value);
            });
    }

    private static Flux<String> processValue(Integer value) {
        // Simulating a slow processing operation
        return Flux.just(value.toString())
            .delayElements(Duration.ofMillis(100));
    }
}

 

이 예제에서는 flatMap 연산자를 사용하여 동시성 수준을 4로 제한했습니다. 
즉, Consumer가 동시에 처리할 수 있는 항목은 4개뿐입니다. 
결과적으로 Consumer는 백프레셔를 효과적으로 관리할 수 있습니다.

 

Publisher의 Event를 제어하는 전략


Plan A

구독자가 요청할 때만 새로운 이벤트를 전송합니다. 
요청에 따라서 이벤트를 가져오는 pull 전략입니다.

 

PlanB

클라이언트 측에서 수신할 수 있는 이벤트의 수를 제한합니다. 
발행자는 한 번에 최대 수의 항목만 클라이언트에게 전송할 수 있는 제한된 push 전략으로 작동합니다.

 

PlanC

소비자가 더 이상 이벤트를 처리할 수 없을 때 데이터 스트리밍을 취소합니다. 
이 경우 수신기는 언제든지 전송을 중단하고 나중에 다시 스트림을 Subscription 할 수 있습니다.

 

Spring Webflux에서는?


백프레셔는 Spring WebFlux에도 Project Reactor가 담당합니다.

WebFlux는 TCP 흐름 제어를 사용하여 백프레셔를 바이트로 제어합니다.

그러나 Consumer가 받을 수 있는 logical element를 핸들링하지 않습니다.

 

하단의 상호 작용 흐름을 살펴보겠습니다

1. WebFlux 프레임워크는 이벤트를 바이트로 변환하여 TCP를 통해 전송 / 수신합니다.

2. Consumer는 다음 logical element를 요청하기 전에 오랜 작업을 시작할 수 있습니다.

3. Receiver가 이벤트를 처리하는 동안 WebFlux는 요청이 없으므로 확인 없이 바이트를 queue에 넣습니다.

4. TCP 프로토콜의 특성으로 인해 새 이벤트가 있는 경우 publisher는 네트워크로 이벤트를 계속 보냅니다.

 

결론적으로, 위의 다이어그램은 consumer와 publisher 간 logical elements에 대한 수요가 다를 수 있다는 것을 보여줍니다.

Spring WebFlux는 전체 시스템으로 상호 작용하는 서비스 간 백프레셔를 이상적으로 관리하지 않습니다.

consumer와 publisher 각각에 대해 독립적으로 처리하고 있지만 두 서비스 간의 logical demand를 고려하지 않고 처리합니다.

따라서 Spring WebFlux는 우리가 예상하는 대로 백프레셔를 처리하지 않습니다.

 

 Implementing Backpressure Mechanism with Spring WebFlux


Request(Plan A)

Consumer가 처리할 수 있는 이벤트를 제어하는 것입니다.
따라서 Publisher는 receiver가 새 이벤트를 요청할 때까지 기다립니다.
Client 가 Flux를 구독하고 수요에 따라 이벤트를 처리합니다.

 

@Test
public void whenRequestingChunks10_thenMessagesAreReceived() {
Flux request = Flux.range(1, 50);

request.subscribe(
  System.out::println,
  err -> err.printStackTrace(),
  () -> System.out.println("All 50 items have been successfully processed!!!"),
  subscription -> {
      for (int i = 0; i < 5; i++) {
          System.out.println("Requesting the next 10 elements!!!");
          subscription.request(10);
      }
  }
);

StepVerifier.create(request)
  .expectSubscription()
  .thenRequest(10)
  .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  .thenRequest(10)
  .expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
  .thenRequest(10)
  .expectNext(21, 22, 23, 24, 25, 26, 27 , 28, 29 ,30)
  .thenRequest(10)
  .expectNext(31, 32, 33, 34, 35, 36, 37 , 38, 39 ,40)
  .thenRequest(10)
  .expectNext(41, 42, 43, 44, 45, 46, 47 , 48, 49 ,50)
  .verifyComplete();

}

 

이 접근 방식으로는, receiver가 받아들일 수 있는 이벤트를 처리할 때까지 event emitter가 event를 내보내는지 않습니다.

즉, 클라이언트는 필요한 이벤트를 처리하는 데 필요한 제어권을 가집니다.

위 코드로 보면 thenRequest(n)이 호출될 때만 다음 n개의 항목이 예상되게 됩니다.

 

Limit(Plan B)

Project Reactor에서 limitRange() 연산자를 사용하는 것입니다.

이 연산자는 한 번에 prefetch 할 아이템 수를 설정할 수 있습니다.

흥미로운 점은 subscriber가 처리할 이벤트를 더 요청해도 limit가 적용된다는 것입니다.

event emitter는 이벤트를 chunk로 분할하여 각 요청에서 limit 이상을 소비하지 않도록 합니다

 

@Test
public void whenLimitRateSet_thenSplitIntoChunks() throws InterruptedException {
    Flux<Integer> limit = Flux.range(1, 25);

    limit.limitRate(10);
    limit.subscribe(
      value -> System.out.println(value),
      err -> err.printStackTrace(),
      () -> System.out.println("Finished!!"),
      subscription -> subscription.request(15)
    );

    StepVerifier.create(limit)
      .expectSubscription()
      .thenRequest(15)
      .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .expectNext(11, 12, 13, 14, 15)
      .thenRequest(10)
      .expectNext(16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
      .verifyComplete();
}

 

Cancel(Plan C)

Project Reactor는 Subscriber를 구현하거나 BaseSubscriber를 extend 할 수 있는 기능을 제공합니다.

따라서 이전에 언급한 클래스를 재정의하여 Subscription을 취소하는 것을 구현하는 방법을 살펴보겠습니다.

 

@Test
public void whenCancel_thenSubscriptionFinished() {
    Flux<Integer> cancel = Flux.range(1, 10).log();

    cancel.subscribe(new BaseSubscriber<Integer>() {
        @Override
        protected void hookOnNext(Integer value) {
            request(3);
            System.out.println(value);
            cancel();
        }
    });

    StepVerifier.create(cancel)
      .expectNext(1, 2, 3)
      .thenCancel()
      .verify();
}

 

 

결론적으로, 에러 핸들링과 백프레셔는 Reactive System에서 안정성과 반응성을 유지하는 데 중요한 개념이며
이러한 개념을 이해하고 애플리케이션에서 효과적으로 구현하면 확장 가능하고 견고한 시스템을 구축할 수 있습니다.

 

 

참조

Backpressure Mechanism in Spring WebFlux | Baeldung

728x90

댓글