[ https://issues.apache.org/jira/browse/CASSANDRA-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14959149#comment-14959149 ]
T Jake Luciani commented on CASSANDRA-10528: -------------------------------------------- You mean in the POC? Well this benchmark was RF=1 so the driver was using TAP and no MS was used. In general terms though, we keep a thread per connection so relates to CASSANDRA-8457 linked above. My thought was to combine our native netty epoll event loop with the messaging service event loop to avoid having many more threads. > Proposal: Integrate RxJava > -------------------------- > > Key: CASSANDRA-10528 > URL: https://issues.apache.org/jira/browse/CASSANDRA-10528 > Project: Cassandra > Issue Type: Improvement > Reporter: T Jake Luciani > Fix For: 3.x > > Attachments: rxjava-stress.png > > > The purpose of this ticket is to discuss the merits of integrating the > [RxJava|https://github.com/ReactiveX/RxJava] framework into C*. Enabling us > to incrementally make the internals of C* async and move away from SEDA to a > more modern thread per core architecture. > Related tickets: > * CASSANDRA-8520 > * CASSANDRA-8457 > * CASSANDRA-5239 > * CASSANDRA-7040 > * CASSANDRA-5863 > * CASSANDRA-6696 > * CASSANDRA-7392 > My *primary* goals in raising this issue are to provide a way of: > * *Incrementally* making the backend async > * Avoiding code complexity/readability issues > * Avoiding NIH where possible > * Building on an extendable library > My *non*-goals in raising this issue are: > > * Rewrite the entire database in one big bang > * Write our own async api/framework > > ------------------------------------------------------------------------------------- > I've attempted to integrate RxJava a while back and found it not ready mainly > due to our lack of lambda support. Now with Java 8 I've found it very > enjoyable and have not hit any performance issues. A gentle introduction to > RxJava is [here|http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/] as > well as their > [wiki|https://github.com/ReactiveX/RxJava/wiki/Additional-Reading]. The > primary concept of RX is the > [Obervable|http://reactivex.io/documentation/observable.html] which is > essentially a stream of stuff you can subscribe to and act on, chain, etc. > This is quite similar to [Java 8 streams > api|http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html] > (or I should say streams api is similar to it). The difference is java 8 > streams can't be used for asynchronous events while RxJava can. > Another improvement since I last tried integrating RxJava is the completion > of CASSANDRA-8099 which provides is a very iterable/incremental approach to > our storage engine. *Iterators and Observables are well paired conceptually > so morphing our current Storage engine to be async is much simpler now.* > In an effort to show how one can incrementally change our backend I've done a > quick POC with RxJava and replaced our non-paging read requests to become > non-blocking. > https://github.com/apache/cassandra/compare/trunk...tjake:rxjava-3.0 > As you can probably see the code is straight-forward and sometimes quite nice! > *Old* > {code} > private static PartitionIterator > fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel > consistencyLevel) > throws UnavailableException, ReadFailureException, ReadTimeoutException > { > int cmdCount = commands.size(); > SinglePartitionReadLifecycle[] reads = new > SinglePartitionReadLifecycle[cmdCount]; > for (int i = 0; i < cmdCount; i++) > reads[i] = new SinglePartitionReadLifecycle(commands.get(i), > consistencyLevel); > for (int i = 0; i < cmdCount; i++) > reads[i].doInitialQueries(); > for (int i = 0; i < cmdCount; i++) > reads[i].maybeTryAdditionalReplicas(); > for (int i = 0; i < cmdCount; i++) > reads[i].awaitRes > ultsAndRetryOnDigestMismatch(); > for (int i = 0; i < cmdCount; i++) > if (!reads[i].isDone()) > reads[i].maybeAwaitFullDataRead(); > List<PartitionIterator> results = new ArrayList<>(cmdCount); > for (int i = 0; i < cmdCount; i++) > { > assert reads[i].isDone(); > results.add(reads[i].getResult()); > } > return PartitionIterators.concat(results); > } > {code} > *New* > {code} > private static Observable<PartitionIterator> > fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel > consistencyLevel) > throws UnavailableException, ReadFailureException, ReadTimeoutException > { > return Observable.from(commands) > .map(command -> new > SinglePartitionReadLifecycle(command, consistencyLevel)) > .flatMap(read -> read.getPartitionIterator()) > .toList() > .map(results -> PartitionIterators.concat(results)); > } > {code} > Since the read call is now non blocking (no more future.get()) we can remove > one thread pool hop from the native netty request pool which yields a > non-trivial improvement to read performance. > !rxjava-stress.png|width=800px! > http://cstar.datastax.com/tests/id/ae648c12-729a-11e5-8625-0256e416528f > At the same time the current Iterator based api still works by calling > {{.toBlocking()}} on the observable. So for example the existing thrift read > call requires little modification > On the async side we get the added benefits of RxJava: > * Customizable backpressure strategies (for dealing with streams that can't > be processed quickly enough) > * Cancelling of work due to timeouts is a 1 line change > * When a Subscriber disconnects from the stream they Observable stops as > well > * Batching/windowing of work can be added in one line > * Observers and Subscribers can do work across any thread at any stage of > the pipeline > * Observables can be [debugged|https://github.com/ReactiveX/RxJavaDebug] > and > [tested|http://reactivex.io/RxJava/javadoc/rx/observers/TestSubscriber.html] > Another plus is the community surrounding RxJava specifically our good > friends at netflix have authored and used it extensively. Docs and examples > are good. > In order to get the most out of this we will need to take this api further > into the code. MessagingService, Disk Access/Page, Cache, Thread per core... > but again I want to hammer home this will be able to be achieved > incrementally. > On the bad side this is: > * Locking into a "framework" > * Will inevitably hit bugs / performance issues we need fixed upstream > * Some of the more advanced API uses look pretty mentally taxing/hard to > grasp > Which brings us to the Alternatives, primarily being to just use > CompletableFutures. > We certainly could but if you look at the code changes I had to make to make > the SP calls asynchronous I think you will realize you would need to pass > all kinds of state around to get the read command callback to start the netty > write. Vs observables which make that pipeline declarative. Also more > advanced things like backpressure and message passing between N:M producers > and consumers becomes complex. This isn't to say we can't [use > both|http://www.nurkiewicz.com/2014/11/converting-between-completablefuture.html] > if Observables are overkill. > I hope this ticket sparks some good discussion! > -- This message was sent by Atlassian JIRA (v6.3.4#6332)