본문 바로가기

생각정리

[Android] RxJava에 대해서

728x90

 

RxJava를 시작하기 앞서, Reactive Programming이 무엇인지에 대해 알아보려 한다.

Reactive Programming이란 데이터 흐름과 전달에 관한 프로그래밍 패러다임이다.

우리는 주로 알고리즘 문제와 같이 절차를 명시하여 순서대로 실행되는 Imperative Programming(명령형 프로그래밍)을 한다. 반면 Reactive Programming이란 데이터의 흐름을 먼저 정의하고 데이터가 변경되었을 때 연관된 작업이 실행된다. 프로그래머가 어떠한 기능을 직접 정해서 실행하는 것이 아닌, 시스템에 이벤트가 발생했을 때 알아서 처리되는 것이다.(선언형 프로그래밍) 

 

예시) 

//명령형 

for(int number : numbers){
  if(number >6){
  	sum += number
  }
}


//선언형 

val sum = numbers.stream()
                 .filter(number -> number>6)
                 .sum()



기존의 프로그래밍 방식을 Pull 방식, Reactive 프로그래밍 방식을 Push 방식이라고도 한다.
Pull 방식은 데이터를 사용하는 곳(Consumer)에서 데이터를 직접 가져와서 사용한다면,
Push 방식은 데이터의 변화가 발생한 곳에서 새로운 데이터를 Consumer에게 전달한다.

 

예시) 

Push : RTC , 소켓을 사용한 채팅, Push 메세지 

Pull : 클라이언트 요청 & 서버 응답 방식의 프로그래밍 , Java같은 절차형 언어

 

따라서 Reactive 프로그래밍은 주변 환경과 끊임없이 상호작용을 한다. 다만 프로그램이 주도하는 것이 아니라 환경이 변하면 이벤트를 받아 동작함으로써 상호작용한다. == 비동기적 상호작용.

 

  • ReactiveX는 관찰가능한 절차를 통해 비동기, 이벤트 기반 프로그램을 구성하기 위한 라이브러리이다.
  • Observer Pattern을 확장하며, Sequence를 조합할 수 있는 연산자를 지원한다.
  • low-level Thread, 동기화, Thread 안전성, non-blocking I/O에 관한 우려를 줄인다.

 

ReactiveX의 이러한 방식을 구성하게 해주는 핵심 요소가 바로 Observable Operator(연산자)

Observable : ReactiveX의 핵심 요소이자 데이터 흐름에 맞게 Consumer에게 알림을 보내는 Class이다.

 

Observable은 데이터 흐름에 맞게 알림을 보내 Observer가 데이터를 사용할 수 있도록 한다. 즉, Observable을 이용해 데이터를 회수하고 변환하는 메커니즘을 정의하고, Observer는 이를 구독해 데이터가 준비되면 이에 반응한다.

 

이 패턴을 Oberver Pattern이라고 하며, Reactive Programming은 이 Oberver Pattern에 기반을 둔다.


Observable은 Collections(List, ArrayList, …)를 사용할 때와 같은 방식으로 비동기 이벤트 스트림을 처리할 수 있다. 다만 Collections의 Iterable이 Push 방식이라면, Observable은 Iterable의 Pull 버전이다.
Iterable은 Consumer(데이터를 소비하는 곳)가 값을 Pull한 후 값이 도착할 때까지 기다리며 Thread를 차단한다면, Observable은 Thread를 차단하지 않고 값이 사용가능하면 Consumer에게 값을 Push한다.

정리하자면,

  1. Observable이 데이터 스트림을 처리하고, 완료되면 데이터를 발행(emit)한다.
  2. 데이터를 발행할 때마다 구독하고 있는 모든 Observer가 알림을 받는다.
  3. Observer는 수신한 데이터를 가지고 어떠한 일을 한다.

 

이러한 원리를 가지고 RxJava가 동작을 한다고 이해하면 좋을 것 이다. 

 

RxJava는 다양한 함수(Operator)를 제공 한다. 예를 들면 아래와 같다. 

  • just() : 가장 간단한 Observable 생성 방식이다. (생성 연산자라고도 한다)
  • create() : Observable을 생성하지만 just()와는 다르게 프로그래머가 onNext, onComplete, onError를 호출해야함
  • map() : RxJava의 연산자이다. 데이터를 원하는 형태로 바꿀 수 있다.
  • subscribe() : Observable은 구독(subscribe)을 해야 데이터가 발행된다. 따라서 Observable을 구독하여 데이터를 발행 후, 수신한 데이터를 원하는 방식으로 사용(System.out::println)한다.
  • doOnDispose() , doOnSubscribe() : 구독해지했을 때와 구독 하였을 때 쓰는 함수.  
  • subscribeOn() : 네트워킹 작업이나 DB 트랜잭션, 파일 시스템 환경 등 블로킹이 발생할 수 있는 곳에서
    비동기적으로 작업을 처리하기 위해 사용되는 스케줄러이다. 쓰레드 풀을 사용하여 새로운 쓰레드가 필요할 때마다 쓰레드를 계속 생성하되,이전에 생성했던 쓰레드가 존재한다면 이를 재사용한다.                                           이것을 설정해 주지않으면 Main 쓰레드에서만 쓰레드 동작을한다. 
  • observeOn() :연산자를 활용하여 스케줄러를 지정해준다면,Observable 데이터 스트림에서 발행한 데이터를 가로채서 지정한 스케줄러에서 이를 구독한다. 따라서 위와 같은 결과가 나오게 된다.

subscribeOn(Schedulers.io()) , observeOn(AndroidSchedulers.mainThread()) (거의 국룰) 

이유: Android 에서는일반적으로 안드로이드에서 RxJava 를 사용 목적에 맞게 가장 많이 사용하는 부분은 백엔드 서버와 네트워킹 동작을 하거나,DB 쿼리 동작을 수행하는 부분이다. 따라서 이에 가장 적합한 IO 스케줄러를 subscribeOn() 을 통해 지정해줌으로써 IO 스케줄러 상으로 결과 데이터를 발행할 수 있도록 한다.

그리고 안드로이드에선 보통 위와 같은 비동기 동작의 결과물을 메인 쓰레드 (UI 쓰레드) 에서 UI 를 갱신하는 등 활용하기 때문에 이를 AndroidSchedulers.mainThread() 로 지정한다.

 

그 외 fromXXX()

  • 여러 데이터를 다뤄야 하는 경우 사용한다. 정의된 메소드의 종류는 다음과 같으며 특정 타입의 데이터를 Observable로 바꿔주는 메소드. fromArray() ,fromIterable() ,fromSingle(), fromMaybe(), fromCompletable()  등이있다.

 

등등이 있다. 

 

하지만 최근에는 RX 보다 Coroutine을 많이 사용하는 추세이기 때문엔 다음 포스팅에서는 코루틴에 대해서 공부를 해봐야겠다.