ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 반응형 확장(RX)
    @ 17. 1 ~ 18/C# Rx(Reactive Extensions) 2017. 8. 20. 04:50

    이벤트 기반 비동기 패턴

    객체간의 통지를 구현하는 관찰자 디자인 패턴을 간단하게 구현하는 것.


    풀기반 방식



    푸시기반 방식

    생산자가 새로운 값에 관해 클라이언트에 통지하는 방식

    클라이언트가 아무런 일을 하지 않은 상태에서 다른 값을 기다리는 동안에 생산자에게 작업 분담을 허용한다.

    즉, 값의 순서를 생산하고 소비자에게 순서대로 각 항목을 통지


    닷넷 4.0부터 비동기 푸시 기반 컬렉션과 자체 클라이언트를 함께 대표하는

    IObservable<out T>와 IObserver<in T>인터페이스 정의를 포함하기 시작했다.

    이 두 인터페이스는 효율적으로 이벤트의 순서를 조합하고 관찰 가능한 컬렉션을 이용해 실제로 모든 다른 타입의 비동기 프로그램에 도움을 주기위해 RX라이브러리라는게 나온다.


    컬렉션을 관찰 가능한 비동기 컬렉션으로 변환

    Enumerable클래스로부터 관찰 가능한 컬렉션을 생성하고 비동기적으로 처리하는 방법을 살펴본다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    namespace testcode {
        class Program {
            static IEnumerable<int> EnumerableEventSequence()
            {
                for (int i = 0; i < 10++i) {
                    Thread.Sleep(TimeSpan.FromSeconds(0.5));
                    yield return i;
                }
            }
     
            static void Main(string[] args)
            {
                foreach (var i in EnumerableEventSequence()) {
                    Console.Write(i);
                }
     
                Console.WriteLine();
                Console.WriteLine("IEnumerable");
     
                IObservable<int> o = EnumerableEventSequence().ToObservable();
     
                using (IDisposable subscription = o.Subscribe(Console.Write)) {
                    Console.WriteLine();
                    Console.WriteLine("IObservable");
                }
     
                o = EnumerableEventSequence().ToObservable().SubscribeOn(TaskPoolScheduler.Default);
     
                using (IDisposable subscription = o.Subscribe(Console.Write)) {
                    Console.WriteLine();
                    Console.WriteLine("IObservable async");
                    Console.ReadLine();
                }
            }
        }
    }
     
    cs

    20라인 : Rx 라이브러리의 ToObservable()를 통해 열거 가능한 컬렉션을 관찰 가능한 컬렉션으로 변환한다.

    22라인 : 관찰 가능한 이 컬렉션의 갱신을 구독하며 컬렉션의 각 갱신에 실행될 수 있는 동작으로서 Console.Write메소드를 제공한다.

    각 반복이 끝날때 까지 기다리며 메인스레드를 사용하기 떄문이다.


    프로그램을 비동기로 만들기 위해 TPL 태스크 풀 스케쥴러가 제공하는 SubscribeOn메소드를 사용한다.

    이 스케줄러는 TPL 태스크 풀에 구독을 배치하며 메인스레드로부터 작업을 분담한다.

    32라인 : ReadLine이 없으면 비공기 이기 때문에 프로그램이 바로 그냥 종료된다.


    관찰 가능한 사용자 정의 컬렉션 순서 작성

    관찰 가능한 사용자 정의 순서를 얻고 소비하기 위해 IOservable<in T>와 IObserver<out T>인터페이스를 구현한다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    using System;
    using System.Collections.Generic;
    using System.Reactive;
    using System.Reactive.Linq;
    using System.Reactive.Concurrency;
    using System.Threading;
    using System.Reactive.Disposables;
    using System.Reactive.Subjects;
     
    namespace ConsoleApplication1 {
     
        public class Program {
     
            class CustomObserver : IObserver<int> {
                public void OnCompleted()
                {
                    Console.WriteLine("Completed");
                }
     
                public void OnError(Exception error)
                {
                    Console.WriteLine("Error {0}", error.Message);
                }
     
                public void OnNext(int value)
                {
                    Console.WriteLine("Next value{0}", value);
                }
            }
     
            class CustomSequence : IObservable<int> {
                private readonly IEnumerable<int> _numbers;
                public CustomSequence(IEnumerable<int> numbers)
                {
                    _numbers = numbers;
                }
     
                public IDisposable Subscribe(IObserver<int> observer)
                {
                    foreach(var number in _numbers) {
                        observer.OnNext(number);
                    }
     
                    observer.OnCompleted();
                    return Disposable.Empty;
                }
            }
     
            static void Main(string[] args)
            {
                var observer = new CustomObserver();
                var goodObservable = new CustomSequence(new [] { 12345 });
                var badObservable = new CustomSequence(null);
     
                using (IDisposable subscription = goodObservable.Subscribe(observer)) {
                }
     
                using (IDisposable subscription = goodObservable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(observer)) {
                    Thread.Sleep(1000);
                }
     
                using (IDisposable subscription = badObservable.SubscribeOn(TaskPoolScheduler.Default).Subscribe(observer)) {
                    Thread.Sleep(1000);
                }
     
                Console.ReadLine();
            }
        }
    }
    cs


    Subject 타입 계열 사용

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    using System;
    using System.Collections.Generic;
    using System.Reactive;
    using System.Reactive.Linq;
    using System.Reactive.Concurrency;
    using System.Threading;
    using System.Reactive.Disposables;
    using System.Reactive.Subjects;
     
    namespace ConsoleApplication1 {
     
        public class Program {
     
            static IDisposable OutputToConsole<T> (IObservable<T> sequence)
            {
                return sequence.Subscribe(
                    obj => Console.WriteLine("{0}", obj)
                    , ex => Console.WriteLine("Error : {0}", ex.Message)
                    , () => Console.WriteLine("Completed")
                    );
            }
     
            static void Main(string[] args)
            {
                Console.WriteLine("Subject");
                //Subject은
                //IObservable과 IOserver 구현부를 둘다 나타낸다.
                //IOServer는 OnNext, OnERror, OnCompleted
                //IObservable 는 Subscribe
                var subject = new Subject<string>();
     
                //출력되지 않는 이유는 전송을 구독전에 했기 때문이다.
                subject.OnNext("A");
     
                using(var subscription = OutputToConsole(subject)) {
                    subject.OnNext("B");
                    subject.OnNext("C");
                    subject.OnNext("D");
                    subject.OnCompleted();
                    //아래의 문자열도 출력이 안됨.
                    //OnCompleted나 OnError 메소드를 Observable를 호출하면 이벤트 순서의 추가 변환을 중단하게 됨.
                    subject.OnNext("Will not be printed out");
                }
                
                //Subject보다는 유연하다
                Console.WriteLine("ReplaySubject");
                var replaySubject = new ReplaySubject<string>();
     
                replaySubject.OnNext("A");
                //브로드캐스팅 시작부터 모든 이벤트를 캐시할 수 있다. 그래서
                //나중에 구독해도 A가 출력된다.
                using(var subscription = OutputToConsole(replaySubject)) {
                    replaySubject.OnNext("B");
                    replaySubject.OnNext("C");
                    replaySubject.OnNext("D");
                    replaySubject.OnCompleted();
                }
     
                Console.WriteLine("Buffered ReplaySubject");
                //단순히 위에 버퍼 크기를 설정한 것.
                //2이니까 마지막의 BC만 들어감.
                var bufferedSubject = new ReplaySubject<string>(2);
     
                bufferedSubject.OnNext("A");
                bufferedSubject.OnNext("B");
                bufferedSubject.OnNext("C");
                using (var subscription = OutputToConsole(bufferedSubject)) {
                    bufferedSubject.OnNext("D");
                    bufferedSubject.OnCompleted();
                }
     
                //위 Replay와 비슷하나 시간을 기준으로 캐싱을 정할 수 있다.
                //오래된 이벤트를 버리고 특정 시간 이하로 일어난 이벤트만 캐시하도록 지정된다. 여기선 200millseconds
                //결국 c와 d만 보임
                Console.WriteLine("Time window ReplaySubject");
                var timeSubject = new ReplaySubject<string>(TimeSpan.FromMilliseconds(200));
                timeSubject.OnNext("A");
                Thread.Sleep(TimeSpan.FromMilliseconds(100));
                timeSubject.OnNext("B");
                Thread.Sleep(TimeSpan.FromMilliseconds(100));
                timeSubject.OnNext("C");
                Thread.Sleep(TimeSpan.FromMilliseconds(100));
     
                using (var subscription = OutputToConsole(timeSubject)) {
                    Thread.Sleep(TimeSpan.FromMilliseconds(100));
                    timeSubject.OnNext("D");
                    timeSubject.OnCompleted();
                }
     
                //태스크 병렬 라이브러리의 Task타입과 비슷하다. 단일 비동기 연산을 대표함.
                //여러 이벤트가 게시됐다면 이벤트 순차 완료를 기다리며 구독자에게 마지막 이벤트만 제공한다.
                //그래서D만 출력됨.
                Console.WriteLine("AsyncSubject");
                var asyncSubject = new AsyncSubject<string>();
     
                asyncSubject.OnNext("A");
                using (var subscription = OutputToConsole(asyncSubject)) {
                    asyncSubject.OnNext("B");
                    asyncSubject.OnNext("C");
                    asyncSubject.OnNext("D");
                    asyncSubject.OnCompleted();
                }
     
                //ReplaySubject와 같지만 오직 한 값만 캐시한다. 그리고 기본값을 지정할 수 있음.
                //여기서 기본값은 Default 출력인데.
                Console.WriteLine("Behabvior subject");
                var be = new BehaviorSubject<string>("Default");
     
                //아래의 주석이 풀리게 되면 디폴트 값이 나오지 않음.
                //be.OnNext("A");
                using (var subscription = OutputToConsole(be)) {
                    be.OnNext("B");
                    be.OnNext("C");
                    be.OnNext("D");
                    be.OnCompleted();
                }
     
                Console.ReadLine();
            }
        }
    }
    cs



    RX 비동기

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    using System;
    using System.Collections.Generic;
    using System.Reactive;
    using System.Reactive.Linq;
    using System.Reactive.Concurrency;
    using System.Threading;
    using System.Reactive.Disposables;
    using System.Reactive.Subjects;
    using System.Threading.Tasks;
    using System.Timers;
     
    using Timer = System.Timers.Timer;
     
    namespace testcode {
        class Program {
            static IDisposable OutputToConsole(IObservable<EventPattern<ElapsedEventArgs>> sequence)
            {
                return sequence.Subscribe(
                    obj => Console.WriteLine("{0}", obj.EventArgs.SignalTime)
                    , ex => Console.WriteLine("Error : {0}", ex.Message)
                    , () => Console.WriteLine("Completed")
                    );
            }
       
            static void Main(string[] args)
            {
                using(var timer = new Timer(1000)) {
                    var ot = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(
                        h => timer.Elapsed += h
                        , h => timer.Elapsed -= h);
                    timer.Start();
     
                    using(var sub = OutputToConsole(ot)) {
                        Thread.Sleep(TimeSpan.FromSeconds(5));
                    }
     
                    Console.WriteLine("----------------");
                    timer.Stop();
                }
     
                Console.ReadLine();
            }
        }
    }
     
    cs

    이벤트 기반 비동기 패턴을 Observable클래스로 바로 변환한다. 타이머를 생성하고 5초간 타이머 이벤트를 소비한 후에 자원을 정리하기 위해 타이머를 처리한다.

Designed by Tistory.