Hi Matt,
I'm only responding in order to avoid what may possibly be a missed
opportunity, and I don't want you or anyone else who may see the term
"Observable" and mistake it for a simple publish/subscribe pattern
that has been seen before. On the contrary, this library is extremely
powerful, easy on the eyes and with some subtle usage, can exert
extreme power and flexibility in its usage.
For example:
Say you have a service where you receive a message for the retrieval
of a document on the web. In servicing that request you need to make
an asynchronous query to back-end services, and then say that service
needs to aggregate several formats from a db, and other streaming
sources, so you nest yet another blocking request to receive those
items in a synchronized fashion and maybe it doesn't end there? Maybe
there is an additional stop along the way where another nested
asynchronous call needs to be made? Now if one of those connections
experiences undue latency or the service is unavailable for some
reason, it is extremely laborious to try and percolate that exception
back up to the original caller. RxJava handles the bubbling of
exceptions from deeply nested calls back to the original call site.
That's just one example of its use.
Then there are the 100 or so operators that can be performed on each
Observable. Each function returns an Observable that "emits" one or
more items or "streams" of data. And each Observable can be "operated
upon" mathematically or combined/reduced/transformed etc. And yes, if
your Observables are composed in a "Functional" manner, meaning they
don't mutate or convey state, you are isolated from a lot of
concurrency issues that you would otherwise have to handle.
I'm not saying that a reckless developer can't still get themselves in
trouble - that is true with any breakthrough library. I'm just saying
this is quite an exceptional offering in the Java language. Other
languages might have something similar - but this is also being used
in the C# camp too (and Ruby, Python, Scala, Clojure, C++, Swift,
JRuby, Kotlin etc.) - Entitled ReactiveX as seen here:
http://reactivex.io/languages.html
This is also different from the Java 8 Streams offering because it can
do both Pull (like Java 8 Streams) and Push oriented (its major use
case) query patterns. As a matter of fact, I'm sure the authors of the
Java 8 release got a lot of their ideas from this library!
Take a look at some of its operators:
Operators By Category
Creating Observables
Operators that originate new Observables.
* *|Create|*
<http://reactivex.io/documentation/operators/create.html> — create
an Observable from scratch by calling observer methods
programmatically
* *|Defer|*
<http://reactivex.io/documentation/operators/defer.html> — do not
create the Observable until the observer subscribes, and create a
fresh Observable for each observer
* *|Empty|/|Never|/|Throw|*
<http://reactivex.io/documentation/operators/empty-never-throw.html> —
create Observables that have very precise and limited behavior
* *|From|* <http://reactivex.io/documentation/operators/from.html> —
convert some other object or data structure into an Observable
* *|Interval|*
<http://reactivex.io/documentation/operators/interval.html> —
create an Observable that emits a sequence of integers spaced by a
particular time interval
* *|Just|* <http://reactivex.io/documentation/operators/just.html> —
convert an object or a set of objects into an Observable that
emits that or those objects
* *|Range|*
<http://reactivex.io/documentation/operators/range.html> — create
an Observable that emits a range of sequential integers
* *|Repeat|*
<http://reactivex.io/documentation/operators/repeat.html> — create
an Observable that emits a particular item or sequence of items
repeatedly
* *|Start|*
<http://reactivex.io/documentation/operators/start.html> — create
an Observable that emits the return value of a function
* *|Timer|*
<http://reactivex.io/documentation/operators/timer.html> — create
an Observable that emits a single item after a given delay
Transforming Observables
Operators that transform items that are emitted by an Observable.
* *|Buffer|*
<http://reactivex.io/documentation/operators/buffer.html> —
periodically gather items from an Observable into bundles and emit
these bundles rather than emitting the items one at a time
* *|FlatMap|*
<http://reactivex.io/documentation/operators/flatmap.html> —
transform the items emitted by an Observable into Observables,
then flatten the emissions from those into a single Observable
* *|GroupBy|*
<http://reactivex.io/documentation/operators/groupby.html> —
divide an Observable into a set of Observables that each emit a
different group of items from the original Observable, organized
by key
* *|Map|* <http://reactivex.io/documentation/operators/map.html> —
transform the items emitted by an Observable by applying a
function to each item
* *|Scan|* <http://reactivex.io/documentation/operators/scan.html> —
apply a function to each item emitted by an Observable,
sequentially, and emit each successive value
* *|Window|*
<http://reactivex.io/documentation/operators/window.html> —
periodically subdivide items from an Observable into Observable
windows and emit these windows rather than emitting the items one
at a time
Filtering Observables
Operators that selectively emit items from a source Observable.
* *|Debounce|*
<http://reactivex.io/documentation/operators/debounce.html> — only
emit an item from an Observable if a particular timespan has
passed without it emitting another item
* *|Distinct|*
<http://reactivex.io/documentation/operators/distinct.html> —
suppress duplicate items emitted by an Observable
* *|ElementAt|*
<http://reactivex.io/documentation/operators/elementat.html> —
emit only item /n/ emitted by an Observable
* *|Filter|*
<http://reactivex.io/documentation/operators/filter.html> — emit
only those items from an Observable that pass a predicate test
* *|First|*
<http://reactivex.io/documentation/operators/first.html> — emit
only the first item, or the first item that meets a condition,
from an Observable
* *|IgnoreElements|*
<http://reactivex.io/documentation/operators/ignoreelements.html> — do
not emit any items from an Observable but mirror its termination
notification
* *|Last|* <http://reactivex.io/documentation/operators/last.html> —
emit only the last item emitted by an Observable
* *|Sample|*
<http://reactivex.io/documentation/operators/sample.html> — emit
the most recent item emitted by an Observable within periodic time
intervals
* *|Skip|* <http://reactivex.io/documentation/operators/skip.html> —
suppress the first /n/ items emitted by an Observable
* *|SkipLast|*
<http://reactivex.io/documentation/operators/skiplast.html> —
suppress the last /n/ items emitted by an Observable
* *|Take|* <http://reactivex.io/documentation/operators/take.html> —
emit only the first /n/ items emitted by an Observable
* *|TakeLast|*
<http://reactivex.io/documentation/operators/takelast.html> — emit
only the last /n/ items emitted by an Observable
Combining Observables
Operators that work with multiple source Observables to create a
single Observable
* *|And|/|Then|/|When|*
<http://reactivex.io/documentation/operators/and-then-when.html> —
combine sets of items emitted by two or more Observables by means
of |Pattern|and |Plan| intermediaries
* *|CombineLatest|*
<http://reactivex.io/documentation/operators/combinelatest.html> —
when an item is emitted by either of two Observables, combine the
latest item emitted by each Observable via a specified function
and emit items based on the results of this function
* *|Join|* <http://reactivex.io/documentation/operators/join.html> —
combine items emitted by two Observables whenever an item from one
Observable is emitted during a time window defined according to an
item emitted by the other Observable
* *|Merge|*
<http://reactivex.io/documentation/operators/merge.html> — combine
multiple Observables into one by merging their emissions
* *|StartWith|*
<http://reactivex.io/documentation/operators/startwith.html> —
emit a specified sequence of items before beginning to emit the
items from the source Observable
* *|Switch|*
<http://reactivex.io/documentation/operators/switch.html> —
convert an Observable that emits Observables into a single
Observable that emits the items emitted by the
most-recently-emitted of those Observables
* *|Zip|* <http://reactivex.io/documentation/operators/zip.html> —
combine the emissions of multiple Observables together via a
specified function and emit single items for each combination
based on the results of this function
Error Handling Operators
Operators that help to recover from error notifications from an Observable
* *|Catch|*
<http://reactivex.io/documentation/operators/catch.html> — recover
from an |onError| notification by continuing the sequence without
error
* *|Retry|*
<http://reactivex.io/documentation/operators/retry.html> — if a
source Observable sends an |onError| notification, resubscribe to
it in the hopes that it will complete without error
Observable Utility Operators
A toolbox of useful Operators for working with Observables
* *|Delay|*
<http://reactivex.io/documentation/operators/delay.html> — shift
the emissions from an Observable forward in time by a particular
amount
* *|Do|* <http://reactivex.io/documentation/operators/do.html> —
register an action to take upon a variety of Observable lifecycle
events
* *|Materialize|/|Dematerialize|*
<http://reactivex.io/documentation/operators/materialize-dematerialize.html> —
represent both the items emitted and the notifications sent as
emitted items, or reverse this process
* *|ObserveOn|*
<http://reactivex.io/documentation/operators/observeon.html> —
specify the scheduler on which an observer will observe this
Observable
* *|Serialize|*
<http://reactivex.io/documentation/operators/serialize.html> —
force an Observable to make serialized calls and to be well-behaved
* *|Subscribe|*
<http://reactivex.io/documentation/operators/subscribe.html> —
operate upon the emissions and notifications from an Observable
* *|SubscribeOn|*
<http://reactivex.io/documentation/operators/subscribeon.html> —
specify the scheduler an Observable should use when it is
subscribed to
* *|TimeInterval|*
<http://reactivex.io/documentation/operators/timeinterval.html> —
convert an Observable that emits items into one that emits
indications of the amount of time elapsed between those emissions
* *|Timeout|*
<http://reactivex.io/documentation/operators/timeout.html> —
mirror the source Observable, but issue an error notification if a
particular period of time elapses without any emitted items
* *|Timestamp|*
<http://reactivex.io/documentation/operators/timestamp.html> —
attach a timestamp to each item emitted by an Observable
* *|Using|*
<http://reactivex.io/documentation/operators/using.html> — create
a disposable resource that has the same lifespan as the Observable
Conditional and Boolean Operators
Operators that evaluate one or more Observables or items emitted by
Observables
* *|All|* <http://reactivex.io/documentation/operators/all.html> —
determine whether all items emitted by an Observable meet some
criteria
* *|Amb|* <http://reactivex.io/documentation/operators/amb.html> —
given two or more source Observables, emit all of the items from
only the first of these Observables to emit an item
* *|Contains|*
<http://reactivex.io/documentation/operators/contains.html> —
determine whether an Observable emits a particular item or not
* *|DefaultIfEmpty|*
<http://reactivex.io/documentation/operators/defaultifempty.html> — emit
items from the source Observable, or a default item if the source
Observable emits nothing
* *|SequenceEqual|*
<http://reactivex.io/documentation/operators/sequenceequal.html> —
determine whether two Observables emit the same sequence of items
* *|SkipUntil|*
<http://reactivex.io/documentation/operators/skipuntil.html> —
discard items emitted by an Observable until a second Observable
emits an item
* *|SkipWhile|*
<http://reactivex.io/documentation/operators/skipwhile.html> —
discard items emitted by an Observable until a specified condition
becomes false
* *|TakeUntil|*
<http://reactivex.io/documentation/operators/takeuntil.html> —
discard items emitted by an Observable after a second Observable
emits an item or terminates
* *|TakeWhile|*
<http://reactivex.io/documentation/operators/takewhile.html> —
discard items emitted by an Observable after a specified condition
becomes false
Mathematical and Aggregate Operators
Operators that operate on the entire sequence of items emitted by an
Observable
* *|Average|*
<http://reactivex.io/documentation/operators/average.html> —
calculates the average of numbers emitted by an Observable and
emits this average
* *|Concat|*
<http://reactivex.io/documentation/operators/concat.html> — emit
the emissions from two or more Observables without interleaving them
* *|Count|*
<http://reactivex.io/documentation/operators/count.html> — count
the number of items emitted by the source Observable and emit only
this value
* *|Max|* <http://reactivex.io/documentation/operators/max.html> —
determine, and emit, the maximum-valued item emitted by an Observable
* *|Min|* <http://reactivex.io/documentation/operators/min.html> —
determine, and emit, the minimum-valued item emitted by an Observable
* *|Reduce|*
<http://reactivex.io/documentation/operators/reduce.html> — apply
a function to each item emitted by an Observable, sequentially,
and emit the final value
* *|Sum|* <http://reactivex.io/documentation/operators/sum.html> —
calculate the sum of numbers emitted by an Observable and emit
this sum
Backpressure Operators
* *backpressure operators*
<http://reactivex.io/documentation/operators/backpressure.html> —
strategies for coping with Observables that produce items more
rapidly than their observers consume them
Connectable Observable Operators
Specialty Observables that have more precisely-controlled subscription
dynamics
* *|Connect|*
<http://reactivex.io/documentation/operators/connect.html> —
instruct a connectable Observable to begin emitting items to its
subscribers
* *|Publish|*
<http://reactivex.io/documentation/operators/publish.html> —
convert an ordinary Observable into a connectable Observable
* *|RefCount|*
<http://reactivex.io/documentation/operators/refcount.html> — make
a Connectable Observable behave like an ordinary Observable
* *|Replay|*
<http://reactivex.io/documentation/operators/replay.html> — ensure
that all observers see the same sequence of emitted items, even if
they subscribe after the Observable has begun emitting items
On Sun, Aug 16, 2015 at 8:29 PM, Matthew Lohbihler
<[email protected] <mailto:[email protected]>> wrote:
Hi David,
It's nice to see that this pattern has been formalized, but this
isn't really anything terribly new. I've been using observables to
do things like rollups and other data stream analysis for years
now, just not with this particular package.
Also, note that Futures/Promises don't really compete with
Observables. Futures are good for chaining arbitrary asynchronous
events, Observables for processing streams of homogenous objects.
(But is it true that the Java Future impl is not very useful. I
had to develop my own for what i thought was a very
straightforward use case.)
Finally, there is nothing inherently thread safe about Observables
(not that anyone said they were... just saying...). When using the
same Observable instance in a multi-threaded context, you still
have to manage synchonization on your own if the context doesn't
somehow do it for you.
Regards,
m@
On 8/15/2015 11:56 PM, cogmission (David Ray) wrote:
...and Their Use In HTM.java's Network API (NAPI)
Hi Everybody,
I wanted to share a one-page short but very well explained
tutorial on composing asynchronous workflows using RxJava.
Due to the availability of "Big Data", Multicore Processors,
Cloud Computing and emerging concepts of Streaming Workflows;
tools like "Hadoop", "Spark" etc. have come to the forefront as a
means of optimizing concurrency and parallel architectures.
Likewise, on a smaller scale, mastering the tools of Streaming
Data utilization is essential in order to be ready when the
future comes visiting upon us.
Enter RxJava...
As you may or may not know, HTM.java's Network API was
constructed using RxJava Observables which are a way of composing
units of work in such a way where they can be used asynchronously
or non-asynchronously without blocking. This also allows you to
seamlessly "tie in" the NAPI in your own applications using
chains of Observable items.
Unlike Java "Futures" which only return a single item, RxJava
Observables (which also can use Futures) can return a "streaming"
flow of items in a non-blocking fashion; this allows one to fully
utilize any Threads in use by letting them be continuously active
throughout their life cycle. Better yet, it is very lightweight
has no outside dependencies and can be used with a multitude of
languages!!
Netflix, uses RxJava and is its original sponsor and creator. It
is a very exciting new concept that is sweeping the Java
developer ranks, and this is one of the best tutorials on the
subject I have found.
http://docs.couchbase.com/developer/java-2.0/observables.html
Enjoy!
David
--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java <https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io
--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java <https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io <http://cortical.io/>