[ 
https://issues.apache.org/jira/browse/CASSANDRA-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

T Jake Luciani reassigned CASSANDRA-10528:
------------------------------------------

    Assignee: T Jake Luciani

> Proposal: Integrate RxJava
> --------------------------
>
>                 Key: CASSANDRA-10528
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-10528
>             Project: Cassandra
>          Issue Type: Improvement
>            Reporter: T Jake Luciani
>            Assignee: T Jake Luciani
>             Fix For: 3.x
>
>         Attachments: rxjava-stress.png
>
>
> The purpose of this ticket is to discuss the merits of integrating the 
> [RxJava|https://github.com/ReactiveX/RxJava] framework into C*.  Enabling us 
> to incrementally make the internals of C* async and move away from SEDA to a 
> more modern thread per core architecture. 
> Related tickets:
>    * CASSANDRA-8520
>    * CASSANDRA-8457
>    * CASSANDRA-5239
>    * CASSANDRA-7040
>    * CASSANDRA-5863
>    * CASSANDRA-6696
>    * CASSANDRA-7392
> My *primary* goals in raising this issue are to provide a way of:
>     *  *Incrementally* making the backend async
>     *  Avoiding code complexity/readability issues
>     *  Avoiding NIH where possible
>     *  Building on an extendable library
> My *non*-goals in raising this issue are:
>     
>    * Rewrite the entire database in one big bang
>    * Write our own async api/framework
>     
> -------------------------------------------------------------------------------------
> I've attempted to integrate RxJava a while back and found it not ready mainly 
> due to our lack of lambda support.  Now with Java 8 I've found it very 
> enjoyable and have not hit any performance issues. A gentle introduction to 
> RxJava is [here|http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/] as 
> well as their 
> [wiki|https://github.com/ReactiveX/RxJava/wiki/Additional-Reading].  The 
> primary concept of RX is the 
> [Obervable|http://reactivex.io/documentation/observable.html] which is 
> essentially a stream of stuff you can subscribe to and act on, chain, etc. 
> This is quite similar to [Java 8 streams 
> api|http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html]
>  (or I should say streams api is similar to it).  The difference is java 8 
> streams can't be used for asynchronous events while RxJava can.
> Another improvement since I last tried integrating RxJava is the completion 
> of CASSANDRA-8099 which provides is a very iterable/incremental approach to 
> our storage engine.  *Iterators and Observables are well paired conceptually 
> so morphing our current Storage engine to be async is much simpler now.*
> In an effort to show how one can incrementally change our backend I've done a 
> quick POC with RxJava and replaced our non-paging read requests to become 
> non-blocking.
> https://github.com/apache/cassandra/compare/trunk...tjake:rxjava-3.0
> As you can probably see the code is straight-forward and sometimes quite nice!
> *Old*
> {code}
> private static PartitionIterator 
> fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel 
> consistencyLevel)
>     throws UnavailableException, ReadFailureException, ReadTimeoutException
>     {
>         int cmdCount = commands.size();
>         SinglePartitionReadLifecycle[] reads = new 
> SinglePartitionReadLifecycle[cmdCount];
>         for (int i = 0; i < cmdCount; i++)
>             reads[i] = new SinglePartitionReadLifecycle(commands.get(i), 
> consistencyLevel);
>         for (int i = 0; i < cmdCount; i++)
>             reads[i].doInitialQueries();
>         for (int i = 0; i < cmdCount; i++)
>             reads[i].maybeTryAdditionalReplicas();
>         for (int i = 0; i < cmdCount; i++)
>             reads[i].awaitRes
> ultsAndRetryOnDigestMismatch();
>         for (int i = 0; i < cmdCount; i++)
>             if (!reads[i].isDone())
>                 reads[i].maybeAwaitFullDataRead();
>         List<PartitionIterator> results = new ArrayList<>(cmdCount);
>         for (int i = 0; i < cmdCount; i++)
>         {
>             assert reads[i].isDone();
>             results.add(reads[i].getResult());
>         }
>         return PartitionIterators.concat(results);
>     }
> {code}
>  *New*
> {code}
> private static Observable<PartitionIterator> 
> fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel 
> consistencyLevel)
>     throws UnavailableException, ReadFailureException, ReadTimeoutException
>     {
>         return Observable.from(commands)
>                          .map(command -> new 
> SinglePartitionReadLifecycle(command, consistencyLevel))
>                          .flatMap(read -> read.getPartitionIterator())
>                          .toList()
>                          .map(results -> PartitionIterators.concat(results));
>     }
> {code}
> Since the read call is now non blocking (no more future.get()) we can remove 
> one thread pool hop from the native netty request pool which yields a 
> non-trivial improvement to read performance.
> !rxjava-stress.png|width=800px!
> http://cstar.datastax.com/tests/id/ae648c12-729a-11e5-8625-0256e416528f
> At the same time the current Iterator based api still works by calling 
> {{.toBlocking()}} on the observable. So for example the existing thrift read 
> call requires little modification
> On the async side we get the added benefits of RxJava:
>   * Customizable backpressure strategies (for dealing with streams that can't 
> be processed quickly enough)
>   * Cancelling of work due to timeouts is a 1 line change
>   * When a Subscriber disconnects from the stream they Observable stops as 
> well
>   * Batching/windowing of work can be added in one line
>   * Observers and Subscribers can do work across any thread at any stage of 
> the pipeline
>   * Observables can be [debugged|https://github.com/ReactiveX/RxJavaDebug] 
> and 
> [tested|http://reactivex.io/RxJava/javadoc/rx/observers/TestSubscriber.html]
> Another plus is the community surrounding RxJava specifically our good 
> friends at netflix have authored and used it extensively. Docs and examples 
> are good.
> In order to get the most out of this we will need to take this api further 
> into the code. MessagingService, Disk Access/Page, Cache, Thread per core... 
> but again I want to hammer home this will be able to be achieved 
> incrementally. 
> On the bad side this is:
>   *  Locking into a "framework"  
>   *  Will inevitably hit bugs / performance issues we need fixed upstream
>   * Some of the more advanced API uses look pretty mentally taxing/hard to 
> grasp
> Which brings us to the Alternatives, primarily being to just use 
> CompletableFutures.
> We certainly could but if you look at the code changes I had to make to make 
> the SP calls asynchronous I think you will realize you would need to pass
> all kinds of state around to get the read command callback to start the netty 
> write.  Vs observables which make that pipeline declarative. Also more 
> advanced things like backpressure and message passing between N:M producers 
> and consumers becomes complex.  This isn't to say we can't [use 
> both|http://www.nurkiewicz.com/2014/11/converting-between-completablefuture.html]
>  if Observables are overkill.
> I hope this ticket sparks some good discussion!
>       



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to