system.reactive - RX misunderstood behavior -


i have below repro code demonstrate problem in more complex flow:

static void main(string[] args)     {         var r = observable.range(1, 10).finally(() => console.writeline("disposed"));         var x = observable.create<int>(o =>             {                 (int = 1; < 11; i++)                 {                     o.onnext(i);                 }                  o.oncompleted();                  return disposable.create(() => console.writeline("disposed"));             });          var src = x.publish().refcount();          var = src.where(i => % 2 == 0).do(i => console.writeline("pair:" + i));         var b = src.where(i => % 2 != 0).do(i => console.writeline("even:" + i));          var c = observable.merge(a, b);          using (c.subscribe(i => console.writeline("final " + i), () => console.writeline("complete")))         {             console.readkey();         }     } 

running snippet r src (var src = r.publish().refcount()) produce numbers 1 till 10, switching src x(like in example) produce pairs, first observable subscribe unless change publish() replay().

why? difference between r , x?

thanks.

although not have patience sort through rx.net source code find implementation detail causes exact behavior, can provide following insight:

the difference in behavior seeing caused race condition. racers in case subscriptions of a , b happen result of subscription observable returned observable.merge. subscribe c, in turn subscribes a , b. a , b defined in terms of publish , refcount of either x or r, depending on case choose.

here's what's happening.

src = r

in case, using custom observable. when subscribed to, custom observible immediately , synchronously begins onnext numbers 1 though 10, , calls oncompleted. interestingly enough, subscription caused publish().refcount() observable when subscribe the first time. subscribed first time a, because a first parameter merge. so, before merge has subscribed b, subscription has completed. merge subscribes b, refcount observable. observable completed, merge looks next observable merge. since there no more observables merge, , because of existing observables have completed, merged observable completes.

the values onnext'd through custom observable have traveled through "pairs" observable, not "evens" observable. therefore, end following:

// "pairs" (has been named incorrectly?) [2, 4, 6, 8, 10] 

src = x

in case, using built-in range method create observable. when subscribed to, range observable does ends yielding numbers 1 though 10. interesting. haven't clue what's happening in method, or when it's happening. can, however, make observations it. if @ happens when src = r (above), can see first subscription takes effect because observable yielding immediately , synchronously. therefore, can determine range observable must not yielding in same manner, instead allows application's control flow execute subscription b before values yielded. difference between custom observable , range observable, range observable scheduling yields happen on currentthread scheduler.

how avoid kind of race condition:

var src = a.publish(); // not ref count  var = src.where(...); var b = src.where(...);  var c = observable.merge(a, b);  var subscription = c.subscribe(i => console.writeline("final " + i), () => console.writeline("complete"))  // don't dispose of subscription. observable creates auto-disposing subscription call dispose once `oncompleted` or `onerror` called.  src.connect(); // connect underlying observable, *after* merge has subscribed both , b. 

notice solution fixing subscription composition of observables not change how source observable works, instead make sure subscription logic isn't allowing race conditions exist. important, because trying fix problem in observable changing behavior, not fixing race. had changed source , switched out later, subscription logic still buggy.


Comments

Popular posts from this blog

user interface - How to replace the Python logo in a Tkinter-based Python GUI app? -

objective c - Greedy NSProgressIndicator Allocation -

how to set an OCR language in Google Drive -