No worries. I hadn't mistaken it for publish/subscribe. It is not that, and it is useful. But it's not new.

On 8/16/2015 11:04 PM, cogmission (David Ray) wrote:


  Hi Matt,

I'm only responding in order to avoid what may possibly be a missed opportunity, and I don't want you or anyone else who may see the term "Observable" and mistake it for a simple publish/subscribe pattern that has been seen before. On the contrary, this library is extremely powerful, easy on the eyes and with some subtle usage, can exert extreme power and flexibility in its usage.

For example:

Say you have a service where you receive a message for the retrieval of a document on the web. In servicing that request you need to make an asynchronous query to back-end services, and then say that service needs to aggregate several formats from a db, and other streaming sources, so you nest yet another blocking request to receive those items in a synchronized fashion and maybe it doesn't end there? Maybe there is an additional stop along the way where another nested asynchronous call needs to be made? Now if one of those connections experiences undue latency or the service is unavailable for some reason, it is extremely laborious to try and percolate that exception back up to the original caller. RxJava handles the bubbling of exceptions from deeply nested calls back to the original call site.

That's just one example of its use.

Then there are the 100 or so operators that can be performed on each Observable. Each function returns an Observable that "emits" one or more items or "streams" of data. And each Observable can be "operated upon" mathematically or combined/reduced/transformed etc. And yes, if your Observables are composed in a "Functional" manner, meaning they don't mutate or convey state, you are isolated from a lot of concurrency issues that you would otherwise have to handle.

I'm not saying that a reckless developer can't still get themselves in trouble - that is true with any breakthrough library. I'm just saying this is quite an exceptional offering in the Java language. Other languages might have something similar - but this is also being used in the C# camp too (and Ruby, Python, Scala, Clojure, C++, Swift, JRuby, Kotlin etc.) - Entitled ReactiveX as seen here: http://reactivex.io/languages.html

This is also different from the Java 8 Streams offering because it can do both Pull (like Java 8 Streams) and Push oriented (its major use case) query patterns. As a matter of fact, I'm sure the authors of the Java 8 release got a lot of their ideas from this library!

Take a look at some of its operators:


  Operators By Category


    Creating Observables

Operators that originate new Observables.

  * *|Create|*
    <http://reactivex.io/documentation/operators/create.html> — create
    an Observable from scratch by calling observer methods
    programmatically
  * *|Defer|*
    <http://reactivex.io/documentation/operators/defer.html> — do not
    create the Observable until the observer subscribes, and create a
    fresh Observable for each observer
  * *|Empty|/|Never|/|Throw|*
    <http://reactivex.io/documentation/operators/empty-never-throw.html> —
    create Observables that have very precise and limited behavior
  * *|From|* <http://reactivex.io/documentation/operators/from.html> —
    convert some other object or data structure into an Observable
  * *|Interval|*
    <http://reactivex.io/documentation/operators/interval.html> —
    create an Observable that emits a sequence of integers spaced by a
    particular time interval
  * *|Just|* <http://reactivex.io/documentation/operators/just.html> —
    convert an object or a set of objects into an Observable that
    emits that or those objects
  * *|Range|*
    <http://reactivex.io/documentation/operators/range.html> — create
    an Observable that emits a range of sequential integers
  * *|Repeat|*
    <http://reactivex.io/documentation/operators/repeat.html> — create
    an Observable that emits a particular item or sequence of items
    repeatedly
  * *|Start|*
    <http://reactivex.io/documentation/operators/start.html> — create
    an Observable that emits the return value of a function
  * *|Timer|*
    <http://reactivex.io/documentation/operators/timer.html> — create
    an Observable that emits a single item after a given delay


    Transforming Observables

Operators that transform items that are emitted by an Observable.

  * *|Buffer|*
    <http://reactivex.io/documentation/operators/buffer.html> —
    periodically gather items from an Observable into bundles and emit
    these bundles rather than emitting the items one at a time
  * *|FlatMap|*
    <http://reactivex.io/documentation/operators/flatmap.html> —
    transform the items emitted by an Observable into Observables,
    then flatten the emissions from those into a single Observable
  * *|GroupBy|*
    <http://reactivex.io/documentation/operators/groupby.html> —
    divide an Observable into a set of Observables that each emit a
    different group of items from the original Observable, organized
    by key
  * *|Map|* <http://reactivex.io/documentation/operators/map.html> —
    transform the items emitted by an Observable by applying a
    function to each item
  * *|Scan|* <http://reactivex.io/documentation/operators/scan.html> —
    apply a function to each item emitted by an Observable,
    sequentially, and emit each successive value
  * *|Window|*
    <http://reactivex.io/documentation/operators/window.html> —
    periodically subdivide items from an Observable into Observable
    windows and emit these windows rather than emitting the items one
    at a time


    Filtering Observables

Operators that selectively emit items from a source Observable.

  * *|Debounce|*
    <http://reactivex.io/documentation/operators/debounce.html> — only
    emit an item from an Observable if a particular timespan has
    passed without it emitting another item
  * *|Distinct|*
    <http://reactivex.io/documentation/operators/distinct.html> —
    suppress duplicate items emitted by an Observable
  * *|ElementAt|*
    <http://reactivex.io/documentation/operators/elementat.html> —
    emit only item /n/ emitted by an Observable
  * *|Filter|*
    <http://reactivex.io/documentation/operators/filter.html> — emit
    only those items from an Observable that pass a predicate test
  * *|First|*
    <http://reactivex.io/documentation/operators/first.html> — emit
    only the first item, or the first item that meets a condition,
    from an Observable
  * *|IgnoreElements|*
    <http://reactivex.io/documentation/operators/ignoreelements.html> — do
    not emit any items from an Observable but mirror its termination
    notification
  * *|Last|* <http://reactivex.io/documentation/operators/last.html> —
    emit only the last item emitted by an Observable
  * *|Sample|*
    <http://reactivex.io/documentation/operators/sample.html> — emit
    the most recent item emitted by an Observable within periodic time
    intervals
  * *|Skip|* <http://reactivex.io/documentation/operators/skip.html> —
    suppress the first /n/ items emitted by an Observable
  * *|SkipLast|*
    <http://reactivex.io/documentation/operators/skiplast.html> —
    suppress the last /n/ items emitted by an Observable
  * *|Take|* <http://reactivex.io/documentation/operators/take.html> —
    emit only the first /n/ items emitted by an Observable
  * *|TakeLast|*
    <http://reactivex.io/documentation/operators/takelast.html> — emit
    only the last /n/ items emitted by an Observable


    Combining Observables

Operators that work with multiple source Observables to create a single Observable

  * *|And|/|Then|/|When|*
    <http://reactivex.io/documentation/operators/and-then-when.html> —
    combine sets of items emitted by two or more Observables by means
    of |Pattern|and |Plan| intermediaries
  * *|CombineLatest|*
    <http://reactivex.io/documentation/operators/combinelatest.html> —
    when an item is emitted by either of two Observables, combine the
    latest item emitted by each Observable via a specified function
    and emit items based on the results of this function
  * *|Join|* <http://reactivex.io/documentation/operators/join.html> —
    combine items emitted by two Observables whenever an item from one
    Observable is emitted during a time window defined according to an
    item emitted by the other Observable
  * *|Merge|*
    <http://reactivex.io/documentation/operators/merge.html> — combine
    multiple Observables into one by merging their emissions
  * *|StartWith|*
    <http://reactivex.io/documentation/operators/startwith.html> —
    emit a specified sequence of items before beginning to emit the
    items from the source Observable
  * *|Switch|*
    <http://reactivex.io/documentation/operators/switch.html> —
    convert an Observable that emits Observables into a single
    Observable that emits the items emitted by the
    most-recently-emitted of those Observables
  * *|Zip|* <http://reactivex.io/documentation/operators/zip.html> —
    combine the emissions of multiple Observables together via a
    specified function and emit single items for each combination
    based on the results of this function


    Error Handling Operators

Operators that help to recover from error notifications from an Observable

  * *|Catch|*
    <http://reactivex.io/documentation/operators/catch.html> — recover
    from an |onError| notification by continuing the sequence without
    error
  * *|Retry|*
    <http://reactivex.io/documentation/operators/retry.html> — if a
    source Observable sends an |onError| notification, resubscribe to
    it in the hopes that it will complete without error


    Observable Utility Operators

A toolbox of useful Operators for working with Observables

  * *|Delay|*
    <http://reactivex.io/documentation/operators/delay.html> — shift
    the emissions from an Observable forward in time by a particular
    amount
  * *|Do|* <http://reactivex.io/documentation/operators/do.html> —
    register an action to take upon a variety of Observable lifecycle
    events
  * *|Materialize|/|Dematerialize|*
    
<http://reactivex.io/documentation/operators/materialize-dematerialize.html> —
    represent both the items emitted and the notifications sent as
    emitted items, or reverse this process
  * *|ObserveOn|*
    <http://reactivex.io/documentation/operators/observeon.html> —
    specify the scheduler on which an observer will observe this
    Observable
  * *|Serialize|*
    <http://reactivex.io/documentation/operators/serialize.html> —
    force an Observable to make serialized calls and to be well-behaved
  * *|Subscribe|*
    <http://reactivex.io/documentation/operators/subscribe.html> —
    operate upon the emissions and notifications from an Observable
  * *|SubscribeOn|*
    <http://reactivex.io/documentation/operators/subscribeon.html> —
    specify the scheduler an Observable should use when it is
    subscribed to
  * *|TimeInterval|*
    <http://reactivex.io/documentation/operators/timeinterval.html> —
    convert an Observable that emits items into one that emits
    indications of the amount of time elapsed between those emissions
  * *|Timeout|*
    <http://reactivex.io/documentation/operators/timeout.html> —
    mirror the source Observable, but issue an error notification if a
    particular period of time elapses without any emitted items
  * *|Timestamp|*
    <http://reactivex.io/documentation/operators/timestamp.html> —
    attach a timestamp to each item emitted by an Observable
  * *|Using|*
    <http://reactivex.io/documentation/operators/using.html> — create
    a disposable resource that has the same lifespan as the Observable


    Conditional and Boolean Operators

Operators that evaluate one or more Observables or items emitted by Observables

  * *|All|* <http://reactivex.io/documentation/operators/all.html> —
    determine whether all items emitted by an Observable meet some
    criteria
  * *|Amb|* <http://reactivex.io/documentation/operators/amb.html> —
    given two or more source Observables, emit all of the items from
    only the first of these Observables to emit an item
  * *|Contains|*
    <http://reactivex.io/documentation/operators/contains.html> —
    determine whether an Observable emits a particular item or not
  * *|DefaultIfEmpty|*
    <http://reactivex.io/documentation/operators/defaultifempty.html> — emit
    items from the source Observable, or a default item if the source
    Observable emits nothing
  * *|SequenceEqual|*
    <http://reactivex.io/documentation/operators/sequenceequal.html> —
    determine whether two Observables emit the same sequence of items
  * *|SkipUntil|*
    <http://reactivex.io/documentation/operators/skipuntil.html> —
    discard items emitted by an Observable until a second Observable
    emits an item
  * *|SkipWhile|*
    <http://reactivex.io/documentation/operators/skipwhile.html> —
    discard items emitted by an Observable until a specified condition
    becomes false
  * *|TakeUntil|*
    <http://reactivex.io/documentation/operators/takeuntil.html> —
    discard items emitted by an Observable after a second Observable
    emits an item or terminates
  * *|TakeWhile|*
    <http://reactivex.io/documentation/operators/takewhile.html> —
    discard items emitted by an Observable after a specified condition
    becomes false


    Mathematical and Aggregate Operators

Operators that operate on the entire sequence of items emitted by an Observable

  * *|Average|*
    <http://reactivex.io/documentation/operators/average.html> —
    calculates the average of numbers emitted by an Observable and
    emits this average
  * *|Concat|*
    <http://reactivex.io/documentation/operators/concat.html> — emit
    the emissions from two or more Observables without interleaving them
  * *|Count|*
    <http://reactivex.io/documentation/operators/count.html> — count
    the number of items emitted by the source Observable and emit only
    this value
  * *|Max|* <http://reactivex.io/documentation/operators/max.html> —
    determine, and emit, the maximum-valued item emitted by an Observable
  * *|Min|* <http://reactivex.io/documentation/operators/min.html> —
    determine, and emit, the minimum-valued item emitted by an Observable
  * *|Reduce|*
    <http://reactivex.io/documentation/operators/reduce.html> — apply
    a function to each item emitted by an Observable, sequentially,
    and emit the final value
  * *|Sum|* <http://reactivex.io/documentation/operators/sum.html> —
    calculate the sum of numbers emitted by an Observable and emit
    this sum


    Backpressure Operators

  * *backpressure operators*
    <http://reactivex.io/documentation/operators/backpressure.html> —
    strategies for coping with Observables that produce items more
    rapidly than their observers consume them


    Connectable Observable Operators

Specialty Observables that have more precisely-controlled subscription dynamics

  * *|Connect|*
    <http://reactivex.io/documentation/operators/connect.html> —
    instruct a connectable Observable to begin emitting items to its
    subscribers
  * *|Publish|*
    <http://reactivex.io/documentation/operators/publish.html> —
    convert an ordinary Observable into a connectable Observable
  * *|RefCount|*
    <http://reactivex.io/documentation/operators/refcount.html> — make
    a Connectable Observable behave like an ordinary Observable
  * *|Replay|*
    <http://reactivex.io/documentation/operators/replay.html> — ensure
    that all observers see the same sequence of emitted items, even if
    they subscribe after the Observable has begun emitting items


On Sun, Aug 16, 2015 at 8:29 PM, Matthew Lohbihler <[email protected] <mailto:[email protected]>> wrote:

    Hi David,

    It's nice to see that this pattern has been formalized, but this
    isn't really anything terribly new. I've been using observables to
    do things like rollups and other data stream analysis for years
    now, just not with this particular package.

    Also, note that Futures/Promises don't really compete with
    Observables. Futures are good for chaining arbitrary asynchronous
    events, Observables for processing streams of homogenous objects.
    (But is it true that the Java Future impl is not very useful. I
    had to develop my own for what i thought was a very
    straightforward use case.)

    Finally, there is nothing inherently thread safe about Observables
    (not that anyone said they were... just saying...). When using the
    same Observable instance in a multi-threaded context, you still
    have to manage synchonization on your own if the context doesn't
    somehow do it for you.

    Regards,
    m@


    On 8/15/2015 11:56 PM, cogmission (David Ray) wrote:
    ...and Their Use In HTM.java's Network API (NAPI)

    Hi Everybody,

    I wanted to share a one-page short but very well explained
    tutorial on composing asynchronous workflows using RxJava.

    Due to the availability of "Big Data", Multicore Processors,
    Cloud Computing and emerging concepts of Streaming Workflows;
    tools like "Hadoop", "Spark" etc. have come to the forefront as a
    means of optimizing concurrency and parallel architectures.
    Likewise, on a smaller scale, mastering the tools of Streaming
    Data utilization is essential in order to be ready when the
    future comes visiting upon us.

    Enter RxJava...

    As you may or may not know, HTM.java's Network API was
    constructed using RxJava Observables which are a way of composing
    units of work in such a way where they can be used asynchronously
    or non-asynchronously without blocking. This also allows you to
    seamlessly "tie in" the NAPI in your own applications using
    chains of Observable items.

    Unlike Java "Futures" which only return a single item, RxJava
    Observables (which also can use Futures) can return a "streaming"
    flow of items in a non-blocking fashion; this allows one to fully
    utilize any Threads in use by letting them be continuously active
    throughout their life cycle. Better yet, it is very lightweight
    has no outside dependencies and can be used with a multitude of
    languages!!

    Netflix, uses RxJava and is its original sponsor and creator. It
    is a very exciting new concept that is sweeping the Java
    developer ranks, and this is one of the best tutorials on the
    subject I have found.

    http://docs.couchbase.com/developer/java-2.0/observables.html

    Enjoy!

    David

-- /With kind regards,/
    David Ray
    Java Solutions Architect
    *Cortical.io <http://cortical.io/>*
    Sponsor of: HTM.java <https://github.com/numenta/htm.java>
    [email protected] <mailto:[email protected]>
    http://cortical.io




--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java <https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io <http://cortical.io/>

Reply via email to