For those seeing this for the first time, look in the quoted email
history for the email from me showing the outline of RxJava operators...
On Mon, Aug 17, 2015 at 7:20 AM, cogmission (David Ray)
<[email protected] <mailto:[email protected]>> wrote:
Omg! I just found this!
On each of the above operator links (seen in red if you're using
Rich Text or HTML email client), there is a link on the landing
page where you can go to their awesome marble diagrams (which the
have for each operator)...
Except, this one is interactive! So you can move around the
marbles and see the effect in their output (see how the
transformation functions change!)
Check it out! http://rxmarbles.com/#zip
Cheers,
David
On Sun, Aug 16, 2015 at 10:04 PM, cogmission (David Ray)
<[email protected] <mailto:[email protected]>>
wrote:
Hi Matt,
I'm only responding in order to avoid what may possibly be a
missed opportunity, and I don't want you or anyone else who
may see the term "Observable" and mistake it for a simple
publish/subscribe pattern that has been seen before. On the
contrary, this library is extremely powerful, easy on the eyes
and with some subtle usage, can exert extreme power and
flexibility in its usage.
For example:
Say you have a service where you receive a message for the
retrieval of a document on the web. In servicing that request
you need to make an asynchronous query to back-end services,
and then say that service needs to aggregate several formats
from a db, and other streaming sources, so you nest yet
another blocking request to receive those items in a
synchronized fashion and maybe it doesn't end there? Maybe
there is an additional stop along the way where another nested
asynchronous call needs to be made? Now if one of those
connections experiences undue latency or the service is
unavailable for some reason, it is extremely laborious to try
and percolate that exception back up to the original caller.
RxJava handles the bubbling of exceptions from deeply nested
calls back to the original call site.
That's just one example of its use.
Then there are the 100 or so operators that can be performed
on each Observable. Each function returns an Observable that
"emits" one or more items or "streams" of data. And each
Observable can be "operated upon" mathematically or
combined/reduced/transformed etc. And yes, if your Observables
are composed in a "Functional" manner, meaning they don't
mutate or convey state, you are isolated from a lot of
concurrency issues that you would otherwise have to handle.
I'm not saying that a reckless developer can't still get
themselves in trouble - that is true with any breakthrough
library. I'm just saying this is quite an exceptional offering
in the Java language. Other languages might have something
similar - but this is also being used in the C# camp too (and
Ruby, Python, Scala, Clojure, C++, Swift, JRuby, Kotlin etc.)
- Entitled ReactiveX as seen here:
http://reactivex.io/languages.html
This is also different from the Java 8 Streams offering
because it can do both Pull (like Java 8 Streams) and Push
oriented (its major use case) query patterns. As a matter of
fact, I'm sure the authors of the Java 8 release got a lot of
their ideas from this library!
Take a look at some of its operators:
Operators By Category
Creating Observables
Operators that originate new Observables.
* *|Create|*
<http://reactivex.io/documentation/operators/create.html> — create
an Observable from scratch by calling observer methods
programmatically
* *|Defer|*
<http://reactivex.io/documentation/operators/defer.html> —
do not create the Observable until the observer
subscribes, and create a fresh Observable for each observer
* *|Empty|/|Never|/|Throw|*
<http://reactivex.io/documentation/operators/empty-never-throw.html> —
create Observables that have very precise and limited behavior
* *|From|*
<http://reactivex.io/documentation/operators/from.html> —
convert some other object or data structure into an Observable
* *|Interval|*
<http://reactivex.io/documentation/operators/interval.html> —
create an Observable that emits a sequence of integers
spaced by a particular time interval
* *|Just|*
<http://reactivex.io/documentation/operators/just.html> —
convert an object or a set of objects into an Observable
that emits that or those objects
* *|Range|*
<http://reactivex.io/documentation/operators/range.html> —
create an Observable that emits a range of sequential integers
* *|Repeat|*
<http://reactivex.io/documentation/operators/repeat.html> — create
an Observable that emits a particular item or sequence of
items repeatedly
* *|Start|*
<http://reactivex.io/documentation/operators/start.html> —
create an Observable that emits the return value of a function
* *|Timer|*
<http://reactivex.io/documentation/operators/timer.html> —
create an Observable that emits a single item after a
given delay
Transforming Observables
Operators that transform items that are emitted by an Observable.
* *|Buffer|*
<http://reactivex.io/documentation/operators/buffer.html> —
periodically
gather items from an Observable into bundles and emit
these bundles rather than emitting the items one at a time
* *|FlatMap|*
<http://reactivex.io/documentation/operators/flatmap.html> —
transform the items emitted by an Observable into
Observables, then flatten the emissions from those into a
single Observable
* *|GroupBy|*
<http://reactivex.io/documentation/operators/groupby.html> —
divide an Observable into a set of Observables that each
emit a different group of items from the original
Observable, organized by key
* *|Map|*
<http://reactivex.io/documentation/operators/map.html> —
transform the items emitted by an Observable by applying a
function to each item
* *|Scan|*
<http://reactivex.io/documentation/operators/scan.html> —
apply a function to each item emitted by an Observable,
sequentially, and emit each successive value
* *|Window|*
<http://reactivex.io/documentation/operators/window.html> —
periodically
subdivide items from an Observable into Observable windows
and emit these windows rather than emitting the items one
at a time
Filtering Observables
Operators that selectively emit items from a source Observable.
* *|Debounce|*
<http://reactivex.io/documentation/operators/debounce.html> —
only emit an item from an Observable if a particular
timespan has passed without it emitting another item
* *|Distinct|*
<http://reactivex.io/documentation/operators/distinct.html> —
suppress duplicate items emitted by an Observable
* *|ElementAt|*
<http://reactivex.io/documentation/operators/elementat.html> —
emit only item /n/ emitted by an Observable
* *|Filter|*
<http://reactivex.io/documentation/operators/filter.html> — emit
only those items from an Observable that pass a predicate test
* *|First|*
<http://reactivex.io/documentation/operators/first.html> —
emit only the first item, or the first item that meets a
condition, from an Observable
* *|IgnoreElements|*
<http://reactivex.io/documentation/operators/ignoreelements.html> —
do not emit any items from an Observable but mirror its
termination notification
* *|Last|*
<http://reactivex.io/documentation/operators/last.html> —
emit only the last item emitted by an Observable
* *|Sample|*
<http://reactivex.io/documentation/operators/sample.html> — emit
the most recent item emitted by an Observable within
periodic time intervals
* *|Skip|*
<http://reactivex.io/documentation/operators/skip.html> —
suppress the first /n/ items emitted by an Observable
* *|SkipLast|*
<http://reactivex.io/documentation/operators/skiplast.html> —
suppress the last /n/ items emitted by an Observable
* *|Take|*
<http://reactivex.io/documentation/operators/take.html> —
emit only the first /n/ items emitted by an Observable
* *|TakeLast|*
<http://reactivex.io/documentation/operators/takelast.html> —
emit only the last /n/ items emitted by an Observable
Combining Observables
Operators that work with multiple source Observables to create
a single Observable
* *|And|/|Then|/|When|*
<http://reactivex.io/documentation/operators/and-then-when.html> —
combine sets of items emitted by two or more Observables
by means of |Pattern|and |Plan| intermediaries
* *|CombineLatest|*
<http://reactivex.io/documentation/operators/combinelatest.html> —
when an item is emitted by either of two Observables,
combine the latest item emitted by each Observable via a
specified function and emit items based on the results of
this function
* *|Join|*
<http://reactivex.io/documentation/operators/join.html> —
combine items emitted by two Observables whenever an item
from one Observable is emitted during a time window
defined according to an item emitted by the other Observable
* *|Merge|*
<http://reactivex.io/documentation/operators/merge.html> —
combine multiple Observables into one by merging their
emissions
* *|StartWith|*
<http://reactivex.io/documentation/operators/startwith.html> —
emit a specified sequence of items before beginning to
emit the items from the source Observable
* *|Switch|*
<http://reactivex.io/documentation/operators/switch.html> — convert
an Observable that emits Observables into a single
Observable that emits the items emitted by the
most-recently-emitted of those Observables
* *|Zip|*
<http://reactivex.io/documentation/operators/zip.html> —
combine the emissions of multiple Observables together via
a specified function and emit single items for each
combination based on the results of this function
Error Handling Operators
Operators that help to recover from error notifications from
an Observable
* *|Catch|*
<http://reactivex.io/documentation/operators/catch.html> —
recover from an |onError| notification by continuing the
sequence without error
* *|Retry|*
<http://reactivex.io/documentation/operators/retry.html> —
if a source Observable sends an |onError| notification,
resubscribe to it in the hopes that it will complete
without error
Observable Utility Operators
A toolbox of useful Operators for working with Observables
* *|Delay|*
<http://reactivex.io/documentation/operators/delay.html> —
shift the emissions from an Observable forward in time by
a particular amount
* *|Do|*
<http://reactivex.io/documentation/operators/do.html> —
register an action to take upon a variety of Observable
lifecycle events
* *|Materialize|/|Dematerialize|*
<http://reactivex.io/documentation/operators/materialize-dematerialize.html> —
represent both the items emitted and the notifications
sent as emitted items, or reverse this process
* *|ObserveOn|*
<http://reactivex.io/documentation/operators/observeon.html> —
specify the scheduler on which an observer will observe
this Observable
* *|Serialize|*
<http://reactivex.io/documentation/operators/serialize.html> —
force an Observable to make serialized calls and to be
well-behaved
* *|Subscribe|*
<http://reactivex.io/documentation/operators/subscribe.html> —
operate upon the emissions and notifications from an
Observable
* *|SubscribeOn|*
<http://reactivex.io/documentation/operators/subscribeon.html> —
specify the scheduler an Observable should use when it is
subscribed to
* *|TimeInterval|*
<http://reactivex.io/documentation/operators/timeinterval.html> —
convert an Observable that emits items into one that emits
indications of the amount of time elapsed between those
emissions
* *|Timeout|*
<http://reactivex.io/documentation/operators/timeout.html> —
mirror the source Observable, but issue an error
notification if a particular period of time elapses
without any emitted items
* *|Timestamp|*
<http://reactivex.io/documentation/operators/timestamp.html> —
attach a timestamp to each item emitted by an Observable
* *|Using|*
<http://reactivex.io/documentation/operators/using.html> —
create a disposable resource that has the same lifespan as
the Observable
Conditional and Boolean Operators
Operators that evaluate one or more Observables or items
emitted by Observables
* *|All|*
<http://reactivex.io/documentation/operators/all.html> —
determine whether all items emitted by an Observable meet
some criteria
* *|Amb|*
<http://reactivex.io/documentation/operators/amb.html> —
given two or more source Observables, emit all of the
items from only the first of these Observables to emit an item
* *|Contains|*
<http://reactivex.io/documentation/operators/contains.html> —
determine whether an Observable emits a particular item or not
* *|DefaultIfEmpty|*
<http://reactivex.io/documentation/operators/defaultifempty.html> —
emit items from the source Observable, or a default item
if the source Observable emits nothing
* *|SequenceEqual|*
<http://reactivex.io/documentation/operators/sequenceequal.html> —
determine whether two Observables emit the same sequence
of items
* *|SkipUntil|*
<http://reactivex.io/documentation/operators/skipuntil.html> —
discard items emitted by an Observable until a second
Observable emits an item
* *|SkipWhile|*
<http://reactivex.io/documentation/operators/skipwhile.html> —
discard items emitted by an Observable until a specified
condition becomes false
* *|TakeUntil|*
<http://reactivex.io/documentation/operators/takeuntil.html> —
discard items emitted by an Observable after a second
Observable emits an item or terminates
* *|TakeWhile|*
<http://reactivex.io/documentation/operators/takewhile.html> —
discard items emitted by an Observable after a specified
condition becomes false
Mathematical and Aggregate Operators
Operators that operate on the entire sequence of items emitted
by an Observable
* *|Average|*
<http://reactivex.io/documentation/operators/average.html> —
calculates the average of numbers emitted by an Observable
and emits this average
* *|Concat|*
<http://reactivex.io/documentation/operators/concat.html> — emit
the emissions from two or more Observables without
interleaving them
* *|Count|*
<http://reactivex.io/documentation/operators/count.html> —
count the number of items emitted by the source Observable
and emit only this value
* *|Max|*
<http://reactivex.io/documentation/operators/max.html> —
determine, and emit, the maximum-valued item emitted by an
Observable
* *|Min|*
<http://reactivex.io/documentation/operators/min.html> —
determine, and emit, the minimum-valued item emitted by an
Observable
* *|Reduce|*
<http://reactivex.io/documentation/operators/reduce.html> — apply
a function to each item emitted by an Observable,
sequentially, and emit the final value
* *|Sum|*
<http://reactivex.io/documentation/operators/sum.html> —
calculate the sum of numbers emitted by an Observable and
emit this sum
Backpressure Operators
* *backpressure operators*
<http://reactivex.io/documentation/operators/backpressure.html> —
strategies for coping with Observables that produce items
more rapidly than their observers consume them
Connectable Observable Operators
Specialty Observables that have more precisely-controlled
subscription dynamics
* *|Connect|*
<http://reactivex.io/documentation/operators/connect.html> —
instruct a connectable Observable to begin emitting items
to its subscribers
* *|Publish|*
<http://reactivex.io/documentation/operators/publish.html> —
convert an ordinary Observable into a connectable Observable
* *|RefCount|*
<http://reactivex.io/documentation/operators/refcount.html> —
make a Connectable Observable behave like an ordinary
Observable
* *|Replay|*
<http://reactivex.io/documentation/operators/replay.html> — ensure
that all observers see the same sequence of emitted items,
even if they subscribe after the Observable has begun
emitting items
On Sun, Aug 16, 2015 at 8:29 PM, Matthew Lohbihler
<[email protected] <mailto:[email protected]>>
wrote:
Hi David,
It's nice to see that this pattern has been formalized,
but this isn't really anything terribly new. I've been
using observables to do things like rollups and other data
stream analysis for years now, just not with this
particular package.
Also, note that Futures/Promises don't really compete with
Observables. Futures are good for chaining arbitrary
asynchronous events, Observables for processing streams of
homogenous objects. (But is it true that the Java Future
impl is not very useful. I had to develop my own for what
i thought was a very straightforward use case.)
Finally, there is nothing inherently thread safe about
Observables (not that anyone said they were... just
saying...). When using the same Observable instance in a
multi-threaded context, you still have to manage
synchonization on your own if the context doesn't somehow
do it for you.
Regards,
m@
On 8/15/2015 11:56 PM, cogmission (David Ray) wrote:
...and Their Use In HTM.java's Network API (NAPI)
Hi Everybody,
I wanted to share a one-page short but very well
explained tutorial on composing asynchronous workflows
using RxJava.
Due to the availability of "Big Data", Multicore
Processors, Cloud Computing and emerging concepts of
Streaming Workflows; tools like "Hadoop", "Spark" etc.
have come to the forefront as a means of optimizing
concurrency and parallel architectures. Likewise, on a
smaller scale, mastering the tools of Streaming Data
utilization is essential in order to be ready when the
future comes visiting upon us.
Enter RxJava...
As you may or may not know, HTM.java's Network API was
constructed using RxJava Observables which are a way of
composing units of work in such a way where they can be
used asynchronously or non-asynchronously without
blocking. This also allows you to seamlessly "tie in" the
NAPI in your own applications using chains of Observable
items.
Unlike Java "Futures" which only return a single item,
RxJava Observables (which also can use Futures) can
return a "streaming" flow of items in a non-blocking
fashion; this allows one to fully utilize any Threads in
use by letting them be continuously active throughout
their life cycle. Better yet, it is very lightweight has
no outside dependencies and can be used with a multitude
of languages!!
Netflix, uses RxJava and is its original sponsor and
creator. It is a very exciting new concept that is
sweeping the Java developer ranks, and this is one of the
best tutorials on the subject I have found.
http://docs.couchbase.com/developer/java-2.0/observables.html
Enjoy!
David
--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java <https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io
--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java <https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io <http://cortical.io/>
--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java <https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io <http://cortical.io/>
--
/With kind regards,/
David Ray
Java Solutions Architect
*Cortical.io <http://cortical.io/>*
Sponsor of: HTM.java <https://github.com/numenta/htm.java>
[email protected] <mailto:[email protected]>
http://cortical.io <http://cortical.io/>