본문 바로가기
Programming Language/Java

[Java] Stream (Stream이 밀려온다)

by 곰민 2023. 1. 9.

Java8 에서 추가된 스트림(Stream)에 대해서 스트림(stream), 스트림 파이프라인(stream pipe line), 스트림 연산자(stream operator), 스트림 병렬 처리 (parall stream)등을 특징들과 함게 살펴보도록 하겠습니다.

 

java8 stream, stream API

 

Stream API


Stream API는 순차적 or 병렬적으로 다량의 데이터 처리 작업을 돕고자 Java8에 추가되었습니다.
이 Stream API가 제공하는 추상 개념 중 핵심적인 것은 두 가지입니다.

 

1. 스트림(Stream)

데이터의 유한 혹은 무한한 일련의 시퀀스를 의미합니다.

Stream으로 넘어온 데이터를 Stream으로 이어받아서 유한하게 처리하거나 무제한으로 처리가 가능합니다.

 

2. 스트림 파이프라인(Stream Pipe Line)

원소들로 수행하는 연산 단계를 표현하는 개념입니다.

기본값 타입으로 int, long, double 세 가지를 지원하며 스트림의 원소들은 어디로부터든 올 수 있습니다.

대표적으로 컬렉션, 배열, 파일, 정규 표현식 패턴 매처(Matcher), 난수 생성기, 또다른 스트림 등이 있습니다.

 

Stream & Stream Pipe Line 특징


Stream API

출처 : https://www.logicbig.com/tutorials/core-java-tutorial/java-util-stream/stream-api-intro.html

Source Stream에서 시작해
최종 연산(terminal operation)으로 끝나며
그 사이에 하나 이상의 중간 연산(Intermediate Operation)가 있을 수 있습니다.

 🚀 중간연산(Intermediate Operation)


중간연산은 Stream을 전달받아서 다른 Stream으로 변환하고 Stream을 리턴한다.

 

ex) filter, map, limit, skip, sorted …

변환 전 스트림 원소 타입과 변환 후 스트림 원소 타입이 같을 수도 있고 다를 수도 있습니다.

 

 

자주 사용되는 filtermap에 대한 예시를 간단하게 확인하겠습니다.

 

FIlter

filter는 주어진 조건에 맞는 요소만으로 구성된 새로운 스트림을 반환합니다. 

 

다중 filter 예제

public void whenUsingMultipleFilters_dataShouldBeFiltered() {
		List<Student> studentList = new ArrayList<>();
		studentList.add(new Student(14 , 50, "PHYSICS"));
		List<Student> filteredStream = studentList.stream()
			.filter(s -> s.getAge() > 13)
			.filter(s -> s.getScore() > 30)
			.filter(not(s -> Objects.equals(s.getSubject(), "PHYSICS")))
			.toList();
}
//not은 Predicate static method not을 static import 한것

 

단일 filter

@Test
public void whenUsingSingleComplexFilter_dataShouldBeFiltered() {
    List<Student> studentList = new ArrayList<>();
    studentList.add(new Student(14 , 50, "PHYSICS"));
    List<Student> filteredStream = students.stream()
          .filter(s -> s.getScore() > 50 
            && s.getAge() > 13 
            && s.getSubject() == "PHYSICS")
          .toList();
}

 

filter 내부는 이전 함수형 인터페이스 포스팅에서 확인했었던 Predicate로 이루어져 있습니다.

stream api filter

 

Map

Map은 Stream Source에 주어진 함수를 적용한 결과로 구성된 Stream을 반환합니다.

 

//people 객체에서 이름만 추출하여 String .List로 반환하는 예시
List<String> nameList = people.stream() 
			.map(p -> p.getName())
            .collect(Collectors.toList());

 

들어올 때는 People 객체 타입이었지만 최종 연산을 거치고 반환 시에는 List<String>으로 반환됩니다

즉 Map은 내부에서 주어진 함수에 따라서 들어오는 타입과 나가는 타입이 같을 수도 있고 같지 않고 달라질 수 도 있습니다.

 

🚀 최종 연산(Terminal Operation)


Stream을 리턴하지 않으며 연산 결과가 Stream이 아니므로 Stream Pipe Line에서 한 번만 가능합니다.

  • ex)collect, allMatch, count, forEach, min, max …

 

foreach 예시

List<Integer> list = Arrays.asList(5,1,1,2,3,4,5); //Source Stream 
Stream<Integer> intStream = list.stream(); 
intStream.distinct().sorted().forEach(System.out::print); 
//중간 연산 distinct()중복제거, sorted()정렬 //최종 연산 foreach() 내부 메서드 레퍼런스 전부 출력 
//12345

 

🚀 Stream Pipe Line 연산 과정


  1. 스트림 생성 (데이터 소스 → 데이터의 연속적인 흐름)
  2. 중간 연산(0 ~ n번)
  3. 최종 연산(1번)

 

🚀 Fluent API


스트림은 .매서드.매서드로 이어서 사용 가능한 매서드 연쇄를 지원하는 fluent API 입니다.

위 예시와 같이 Stream Pipe Line 하나를 구성하는 모든 호출을 연결하여 단 하나의 표현식으로 완성할 수 있습니다.

 

스트림으로 처리하는 데이터는 오직 한 번만 처리된다.


Stream Pipe Line을 한번 쭉 지나가면서 한 번만 처리됩니다.

 

🚀 지연 평가(lazy evaluation)


스트림 파이프라인의 중계형 연산자는 지연 평가(lazy evaluation)가 됩니다.

map() 과 같은 중계형 연산자는 collect() 과 같은 terminal operator가 오기 전까지는 실행을 하지 않습니다.

 

List<String>names = new ArrayList<>(); 
names.add("gom"); 
Stream<String> stringStream = names.stream().map(String::toUppperCase); 
//중간연산 소문자 대문자로 변경 //최종 연산 x 
names.forEach(System.out::println); 
//gom 
//여전히 소문자로 남아있습니다.

 

위 예제를 보면 최종 연산(terminal operation)을 하지 않았고 중계형 연산자가 Stream Source를 실질적으로 처리하지 않았기 때문에 여전히 stringStream은 소문자로 출력 됩니다. 

 

List<Integer> list = Arrays.asList(5,1,1,2,3,4,5); 
Stream<Integer> intStream = list.stream(); 
intStream.distinct().sorted().forEach(System.out::print); 
//중간 연산 중복제거, 정렬 //최종 연산 메서드 레퍼런스 전부 출력 
//12345

위 예시와 같이 최종 연산이 있는 경우 중간 연산이 전부 적용되어서 잘 출력 됩니다.

즉 evaluation은 terminal operation 시점에 lazy 하게 이루어 집니다.

 

🚀 Stream의 무제한 처리와 Short-circuiting Operations


Stream 으로 넘어온 데이터를 Stream으로 이어받아서 계속 무제한으로 처리가 가능하다고 했었습니다.

IntStream.iterate(0, i -> i + 2).forEach(System.out::println); 
//중간 연산 i+2 // 최종 연산 Foreach 
//계속 찍힙니다.

 

계속 끝이 없이 찍힙니다.

이러한 무제한 처리를 Short-circuiting Operations를 통해서 제한할 수 있습니다.

 

 

Short-circuiting Operations란?

Short-circuiting은 결과가 결정되자마자 expression에 대한 평가가 멈추는 것을 의미합니다.

 

f(a == b || c = d || e == f){ 
    //Do something 
}

 condition 문에 대한 결과가 a == b 가 true면 이미 결정되었기 때문에 c == d 와 e == f는 evaluated되지 않습니다.

 

Stream Shoort-circuiting Operations

 

출처 : https://www.logicbig.com/tutorials/core-java-tutorial/java-util-stream/short-circuiting.html

 

Stream 에서 limit()을 활용하여 제한해 보도록 하겠습니다

IntStream.iterate(0, i -> i + 2).limit(100).forEach(System.out::println);
//중간 연산 i+2, limit() 100개까지만 제한 // 최종 연산 Foreach
//100개만 찍힙니다

limit()를 활용하여 100개까지만 제한하였고 이후의 것들은
evaluated 되지 않기 때문에 무제한 stream을 제한해서 사용할 수 있습니다.

 

🚀 Stream 은 손쉽게 병렬 처리를 할 수 있다


for (String bookName : bookNames){
    if(bookname.startWith("Harry potter")) {
        System.out.println(bookName.toUpperCase());ß
    }
}

 

반복을 돌면서 특정 작업을 하는 for 문의 경우 parallel이 아닌 serial로 동작하게 됩니다.

 

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.stream().forEach(number ->
    System.out.println(number + " " + Thread.currentThread().getName())
);

//main
//main
//main
//main

 

 

기본적으로 병렬로 명시하지 않는 한 모든 stream은 순차적으로 작동하며
단일 쓰레드를 사용하여 stream pipe line을 처리합니다.

 

List<String> collect = bookNames.parallelStream().map((s) -> {
    System.out.println(s + " " + Thread.currentThread().getName());
    return s.toUpperCase();}).collect(Collectors.toList());
collect.forEach(System.out::println);
//bookNames + Thread name 출력

 

Stream 의 경우 parallelStream() 을 활용하여 손쉽게 병렬 처리를 할 수 있습니다.

parallelStream을 사용하면 별도의 코어에서 병렬로 코드를 실행할 수 있으며 최종 결과는 각 개별 결과의 조합입니다.

parallelStream은 Java7에서 추가된 fork-join framework로 스레드 간에 소스 데이터를 분할하고
작업 완료시 콜백 처리를 합니다.

단 실행 순서는 통제할 수 없으며 프로그램을 실행할 때마다 변경될 수 있습니다.

 

특정 연산이 순차 연산과 병렬 연산에서 결과 값이 다를 수도 있다


정수의 합을 병렬로 처리하는 예시 (시작 합계에 5 추가로 더함.)

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4); 
int parallelSum = listOfNumbers.parallelStream().reduce(5, Integer::sum); 
System.out.println("parallelSum = " + parallelSum); 
int serialSum = listOfNumbers.stream().reduce(5, Integer::sum); 
System.out.println("serialSum = " + serialSum); 

//parallelSum = 30 
//serialSum = 15

 

 

parallelStream 의 연산 결과값은 30입니다.

 

 

2개의 쓰레드인 경우 위 와 같이 각각의 쓰레드에 시작시 5를 더하고 시작하기 때문에 결과값이 30이 나온것을 확인할 수 있습니다.
즉 common fork-join pool에서 사용하는 스레드에 수에 따라서 값은 달라질 수 있습니다.

common fork-join pool은 보통 프로세서 코어수 -1입니다.

jvm 설정값을 수정하여 전역 설정으로 parallelstream의 스레드 수를 정해둘 수 있지만 권장하지 않습니다.

 

값이 동일하게 나오려면 parallel stream 외부로 빼야 합니다

List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4); 
int parallelSum = listOfNumbers.parallelStream().reduce(Integer::sum) + 5; 
System.out.println("parallelSum = " + parallelSum);

 

 

 어떤 연산에 parallelStream()을 활용할지 잘 생각 해봐야 합니다.

처리할 데이터의 양이 많은 경우 parallelStream()을 활용한 병렬 처리의 성능 효과를 누릴 수 있습니다.
하지만 parallelStream()의 사용은 주의해야 하며 순차처리를 잘 사용하다가 성능 이슈가 생기는 대용량 처리에 있어서 serial 처리와 parallell 처리의 성능 측정을 통한 더 나은 방식을 찾는 것이 중요하다고 생각합니다.

참조


Stream (Java Platform SE 8 )

Short-circuit evaluation - Wikipedia

더 자바, Java 8 - 인프런 | 강의

What is short circuiting and how is it used when programming in Java?

baeldung multiple-filter

baeldung parallel-stream

반응형

댓글