본문 바로가기
Spring

[Spring] Project Reactor EventLoop와 Flux와 Mono.

by 곰민 2023. 3. 21.
728x90

Project Reactor는 JVM(Java Virtual Machine)에서 비동기 및 반응형 애플리케이션을 구축하기 위해 Reactive Streams를 활용하는 라이브러리입니다.
주요 publisher인 Mono와 Flux와 함께 다양한 연산자들을 제공하여 개발자들이 비동기 이벤트 시퀀스를 효율적으로 처리할 수 있도록 지원합니다.
이러한 기능 덕분에 Reactor는 Spring WebFlux와 같은 반응형 라이브러리 및 프레임워크의 기반으로 널리 사용되고 있습니다.

 

Reactor
Spring에 그지같이 따수운 반응형이 있다는 증거😩

 

 

Reactive Programing과 Reactive Stream에 대해서는 이전 포스팅을 참조해 주시면 감사하겠습니다.
[Spring] Reactive Programming 개요

 

[Spring] Reactive Programming 개요

Spring Webflux를 학습하기 이전에 Reactive Programming에 대해서 명령형 프로그래맹, 선언적 프로그래밍 그리고 Data Stream 과 주요 특징들, Reactive Stream 과 주요 요소들에 대해서 자세하게 알아보도록 하

colevelup.tistory.com

 

 

Reactor 라이브러리


Project Reactor의 주요 목표는 다음과 같습니다:

-   백프레셔를 처리하고 리소스를 관리하는 간단하고 효율적인 프로그래밍 모델을 제공합니다.
-   고성능, 확장 가능하고 복원력 있는 애플리케이션 개발을 가능하게 합니다.
-   데이터 스트림을 쉽게 조작하고 결합할 수 있는 풍부한 연산자 세트를 제공합니다.

 


Reactor 프로젝트의 이벤트 루프 생성 및 메커니즘


반응형 프로그래밍은 프로그램의 흐름을 일련의 동기식 연산에서 비동기식 이벤트 스트림으로 전환하는 것을 강조합니다.

이것은 동기성을 중심으로 한 전략으로, 데이터를 가져오는 중이더라도 데이터베이스에 대한 읽기 호출이 호출 스레드를 차단하지 않습니다.

대신, 호출은 publisher로 반환되어 subscriber가 이벤트를 처리하거나, 직접 이벤트를 생성할 수 있도록 합니다.

반응형 프로그래밍은 어떤 스레드가 이벤트를 생성하고 소비해야 하는지에 대한 강조보다는, 비동기 이벤트 스트림으로 프로그램을 구조화하는 데 중점을 둡니다.

이를 통해 publisher와 subscriber가 같은 스레드에 속하지 않아도 되므로, 사용 가능한 스레드를 더 효율적으로 활용할 수 있습니다.

 

출처:https://www.baeldung.com/spring-webflux-concurrency

 

Reactor 프로젝트에서는 이벤트 루프가 Netty를 기본 네트워킹 및 동시성 프레임워크로 사용하는 Reactor Netty 라이브러리에 의해 관리됩니다.

이벤트 루프는 들어오는 이벤트를 처리하고 non-blocking 및 비동기 방식으로 작업을 스케줄링하여 높은 동시성과 성능을 제공합니다.

 

 

Event Loop 주요 메커니즘

 

출처:https://www.baeldung.com/spring-webflux-concurrency

EventloopGroup

Reactor Netty는 사용 가능한 CPU 코어 수에 따라 이벤트 루프 그룹을 생성합니다

기본 이벤트 루프 그룹은 NIO 기반 전송을 위한 NioEventLoopGroup이며, 다른 전송을 위한 epoll 또는 kqueue와 같은 다른 구현이 있습니다.
이벤트 루프는 이벤트 큐의 이벤트를 순차적으로 처리하고 플랫폼에 콜백을 등록한 후 즉시 반환합니다.
플랫폼은 데이터베이스 호출이나 외부 서비스 호출과 같은 작업의 완료를 트리거할 수 있습니다.

이벤트 루프는 작업 완료 알림에 따라 콜백을 트리거하고 결과를 원래 호출자에게 다시 보낼 수 있습니다.

 

EventQueue

각 이벤트 루프에는 실행을 위해 보류 중인 작업을 저장하는 연결된 작업 큐가 있습니다. 

새 작업이 제출되면 해당 이벤트 루프의 작업 큐에 큐에 대기합니다. 

작업에는 들어오는 데이터 처리, 네트워크에 데이터 쓰기 또는 사용자 정의 콜백 실행 등이 포함됩니다.

 

스케줄링 정책

Reactor Netty는 Netty의 스케줄링 정책을 사용하여 태스크 실행을 관리합니다. 

기본적으로 작업은 사용 가능한 이벤트 루프 중에서 라운드 로빈 방식으로 실행됩니다. 


스레드 할당

Reactor Netty의 이벤트 루프 모델은 단일 스레드이므로 각 이벤트 루프가 단일 스레드에서 작업 처리를 담당합니다. 

이를 통해 컨텍스트 전환의 오버헤드를 최소화하고 작업이 non-blocking 방식으로 실행되도록 보장합니다.

 

Tomcat NIO vs Netty NIO

[Spring] Tomcat VS Netty Connector - NIO(non-blocking I/O) or BIO(Blocking I/O) (tistory.com)

 

[Spring] Tomcat VS Netty Connector - NIO(non-blocking I/O) or BIO(Blocking I/O)

Tomcat과 Netty의 차이점을 클라이언트의 요청과 응답을 처리하는 Server Connect의 측면에서 우선 적으로 살펴보도록 하겠습니다. BIO (Blocking I/O) 및 NIO (Non-blocking I/O) 커넥터의 작동 원리에 대해서 알

colevelup.tistory.com

Tomcat과 Netty에서의 NIO connection은 위 포스팅을 확인해 주시면 됩니다.

 

 

그렇다면 EventLoop내에서의 Data 처리는 누가 담당할까요? 🤔🤔

 

Mono와 Flux publishers 📚


Project Reactor는 Mono와 Flux 두 가지 주요 publisher를 제공합니다
Mono와 Flux는 reactiveStream 에서의 publisher로 event loop 내에서 데이터를 전달, 변환, 필터링, 다양한 연산을 수행할 수 있으며 error 또는 complete와 같은 종료 신호를 통해 stream의 완료를 알릴 수 있습니다.
스케줄러는 여러 스레드 모듈을 갖을 수 있으며 이를 통해 event loop에서 작동할 Mono와 Flux를 제어합니다.


Mono

 

Mono
출처:Mono (reactor-core 3.5.4) (projectreactor.io)

 

Mono0개 또는 1개의 항목을 생성하는 publisher입니다.

이는 값을 반환하거나 오류를 반환할 수 있는 단일 비동기 연산을 나타냅니다.

Mono는 비동기 작업에서 단일 결과 또는 전혀 결과가 없는 경우에 유용합니다.

Mono<String> mono = Mono.just("Hello, Reactor!");
mono.subscribe(System.out::println);


Flux

 

flux
출처:Flux (reactor-core 3.5.4) (projectreactor.io)

 

Flux0개 이상의 항목을 생성할 수 있는 publisher입니다.

이는 비동기적으로 처리할 수 있는 데이터 스트림을 나타냅니다.

Flux는 비동기 작업에서 여러 항목을 처리해야 하는 경우에 유용합니다.

Flux<String> flux = Flux.just("Hello", "Reactor", "World!"); 
flux.subscribe(System.out::println);

 

Flux와 Mono를 생성하는 Factory Method들


Flux와 Mono를 생성하는 주된 Factory Method들 몇 가지를 살펴보도록 하겠습니다.
Factory Method 명을 클릭하시면 link를 타고 document로 이동합니다.

 

Flux

 

just(T... data)

받은 element를 emit 하는 Flux를 생성한 뒤 종료.

 

Flux<String> fruitsFlux = Flux.just("Apple", "Orange", "Banana", "Grape");

 

fromIterable(Iterable<? extends T> it) (flux only)

iterable에 포함된 item을 emit 하는 flux를 생성합니다.

 

List<String> fruitsList = Arrays.asList("Apple", "Orange", "Banana", "Grape");
Flux<String> fruitsFlux = Flux.fromIterable(fruitsList);

 

range(int start, int count) (flux only)

받은 start 값에서 시작하여 총 카운트 값을 갖는 정수 시퀀스를 방출하는 flux를 생성.

 

Flux<Integer> numbersFlux = Flux.range(1, 5);

 

empty()

아이템을 방출하지 않고 즉시 완료되는 flux를 생성합니다.

 

Flux<String> emptyFlux = Flux.empty();

 

error(Throwable error)

error signal을 즉시 emit 하는 flux를 생성합니다.

 

Flux<String> errorFlux = Flux.error(new RuntimeException("Error occurred!"));

 

defer(Supplier<? extends Publisher<T>> supplier)

subscription이 발생할 때마다 제공된 Supplier 함수를 호출하여 새로운 Flux 인스턴스를 생성합니다.

Flux<Long> currentTimeFlux = Flux.defer(() -> Flux.just(System.currentTimeMillis()));

 

Mono

 

just(T data)

받은 element를 emit 하는 Mono를 생성한 뒤 종료.

 

Mono<String> fruitMono = Mono.just("Apple");

 

empty()

item을 emit 하지 않고 즉시 complete 되는 Mono를 생성합니다.

 

Mono<String> emptyMono = Mono.empty();

 

error(Throwable error)

error signal을 즉시 emit 하는 Mono를 만듭니다.

 

Mono<String> errorMono = Mono.error(new RuntimeException("Error occurred!"));

 

 

fromCallable(Callable<? extends T> supplier) (Mono only)

제공된 Callable을 사용하여 해당 값을 생성하는 Mono를 생성합니다.

 

Mono<String> callableMono = Mono.fromCallable(() -> "Apple");

 

fromCompletionStage(CompletionStage<? extends T> completionStage) (Mono only)

주어진 CompletionStage의 결과를 내보내는 Mono를 생성합니다.
이 메서드는 CompletableFuture와 같은 비동기 작업 결과를 Mono로 변환할 때 사용됩니다.

 

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Apple");
Mono<String> completionStageMono = Mono.fromCompletionStage(future);

 

defer(Supplier<? extends Mono<? extends T>> supplier)

subscription이 발생할 때마다 제공된 Supplier 함수를 호출하여 새로운 Mono 인스턴스를 생성합니다.

 

Mono<Long> currentTimeMono = Mono.defer(() -> Mono.just(System.currentTimeMillis()));

 

 

flux&mono
오 뭐가 많긴하네 그래서 언제 뭘쓸까

 

언제 어떤 Factory method를 사용하는 것이 좋을까? 🤔🤔

  1. just: 이미 생성된 값을 반환하는 경우에 유용.
  2. fromIterable (Flux only): 기존 컬렉션을 데이터 스트림으로 변환할 때 유용.
  3. range (Flux only): 숫자 범위를 데이터 스트림으로 표현할 때 유용.
  4. empty: 아무 결과가 없는 작업을 표현할 때 유용.
  5. error: 예외 처리 및 오류 전파를 표현할 때 사용.
  6. defer: subscription에 실행될 lazy 된 작업을 표현하는 Flux 또는 Mono를 생성합니다.
    lazy처리를 원하거나 각 subscriber에게 새로운 publisher 인스턴스를 제공하려고 할 때 유용.
  7. fromCallable (Mono only): 주어진 Callable의 결과를 내보내는 Mono를 생성.
    lazy 한 실행이 필요한 task를 표현할 때 사용.
  8. fromCompletionStage (Mono only): CompletableFuture와 같은 비동기 작업 결과를 Mono로 변환할 때 사용.

 

Basic Reactor operators ⚙️


Reactor는 데이터 스트림을 조작하고 데이터를 결합하는 데 도움이 되는 다양한 연산자를 제공합니다.
일부 기본 연산자에 대해서 알아보도록 하겠습니다.

 

map


map operator는 publisher가 내보낸 각 항목에 변환 함수를 적용하고 변환된 항목을 내보냅니다.

Mono<Integer> mono = Mono.just("Hello")
    .map(String::length);
mono.subscribe(System.out::println); // Output: 5



filter

 

filter  operator는 publisher가 내보낸 항목을 조건에 따라 필터링합니다.
즉 조건에 맞는 항목만 내보냅니다.

Flux<Integer> evenNumbers = Flux.range(1, 10)
    .filter(number -> number % 2 == 0);
evenNumbers.subscribe(System.out::println); // Output: 2, 4, 6, 8, 10



flatMap

 

flatMap operator는 publisher가 내보낸 각 항목을 다른 publisher로 변환한 다음 이러한 publisher들의 data들을 하나의 출력 스트림으로 평탄화합니다.

Flux<String> flux = Flux.just("Hello", "Reactor")
    .flatMap(word -> Flux.fromArray(word.split("")));
flux.subscribe(System.out::println); 
// Output: H, e, l, l, o, R, e, a, c, t, o, r


zip

 

zip operator는 두 개의 publisher를 결합하여 두 publisher에서 항목 쌍을 순서대로 내보냅니다. 
결과 스트림은 한 publisher가 완료되면 완료됩니다.

Flux<String> words = Flux.just("Hello", "Reactor");
Flux<Integer> lengths = Flux.just(5, 7);

Flux<Tuple2<String, Integer>> zipped = Flux.zip(words, lengths);
zipped.subscribe(pair -> System.out.println(pair.getT1() + " has length " + pair.getT2())); 
// Output: Hello has length 5, Reactor has length 7


merge

 

merge operator는 여러 publisher를 하나의 스트림으로 결합하며, 모든 publisher에서 사용 가능한 항목을 내보냅니다.
출력 스트림은 모든 입력 publisher가 완료될 때 완료됩니다.

Flux<String> flux1 = Flux.just("A", "B", "C"); 
Flux<String> flux2 = Flux.just("1", "2", "3"); 
Flux<String> merged = Flux.merge(flux1, flux2); 
merged.subscribe(System.out::println); 
// Output: A, B, C, 1, 2, 3 (order may vary)


concat

 

concat operator는 여러 publisher를 하나의 스트림으로 연결하며, 첫 번째 publisher의 항목부터 차례로 내보냅니다.
output 스트림은 모든 입력 publisher가 완료될 때 완료됩니다.

 

Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("1", "2", "3");
Flux<String> concatenated = Flux.concat(flux1, flux2);
concatenated.subscribe(System.out::println); // Output: A, B, C, 1, 2, 3

 

Project Reactor는 JVM에서 비동기 및 반응형 애플리케이션을 구축하는 데 필수적인 도구입니다.
Mono와 Flux와 같은 주요 publisher들과 다양한 연산자들을 통해 개발자들은 비동기 이벤트 시퀀스를 효율적으로 처리할 수 있게 됩니다.

 

 

참조


Concurrency in Spring WebFlux | Baeldung

Flux (reactor-core 3.5.4) (projectreactor.io)

Mono (reactor-core 3.5.4) (projectreactor.io)

Getting started with Project Reactor - Reactive Programming | Jstobigdata

728x90

댓글