[ https://issues.apache.org/jira/browse/CASSANDRA-4718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13635120#comment-13635120 ]
Piotr Kołaczkowski commented on CASSANDRA-4718: ----------------------------------------------- Another thing to consider might be using a high-performance Actor library e.g. Akka. I did a quick microbenchmark to see what is the latency of just passing a single message through several stages, in 3 variants: 1. Sync: one threadpool per stage, where some coordinator thread just moves message from one ExecutorService to another, after the stage finished processing 2. Async: one threadpool per stage, where every stage directly asynchronously pushes its result into the next stage 3. Akka: one Akka actor per stage, where every stage directly asynchronously pushes its result into the next stage The clear winner is Akka: {noformat} 2 stages: Sync: 38717 ns Async: 36159 ns Akka: 12969 ns 4 stages: Sync: 65793 ns Async: 49964 ns Akka: 18516 ns 8 stages: Sync: 162256 ns Async: 100009 ns Akka: 9237 ns 16 stages: Sync: 296951 ns Async: 183588 ns Akka: 13574 ns 32 stages: Sync: 572605 ns Async: 361959 ns Akka: 23344 ns {noformat} Code of the benchmark: {noformat} package pl.pk.messaging import java.util.concurrent.{CountDownLatch, Executors} import akka.actor.{Props, ActorSystem, Actor, ActorRef} class Message { var counter = 0 val latch = new CountDownLatch(1) } abstract class MultistageThreadPoolProcessor(stageCount: Int) { val stages = for (i <- 1 to stageCount) yield Executors.newCachedThreadPool() def shutdown() { stages.foreach(_.shutdown()) } } /** Synchronously processes a message through the stages. * The message is passed stage-to-stage by the coordinator thread. */ class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) { def process() { val message = new Message val task = new Runnable() { def run() { message.counter += 1 } } for (executor <- stages) executor.submit(task).get() } } /** Asynchronously processes a message through the stages. * Every stage after finishing its processing of the message * passes the message directly to the next stage, without bothering the coordinator thread. */ class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount) { def process() { val message = new Message val task = new Runnable() { def run() { message.counter += 1 if (message.counter >= stages.size) message.latch.countDown() else stages(message.counter).submit(this) } } stages(0).submit(task) message.latch.await() } } /** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools and queues. * Every stage after finishing its processing of the message * passes the message directly to the next stage, without bothering the coordinator thread. */ class AkkaProcessor(stageCount: Int) { val system = ActorSystem() val stages: IndexedSeq[ActorRef] = { for (i <- 1 to stageCount) yield system.actorOf(Props(new Actor { def receive = { case m: Message => m.counter += 1 if (m.counter >= stages.size) m.latch.countDown() else stages(m.counter) ! m } })) } def process() { val message = new Message stages(0) ! message message.latch.await() } def shutdown() { system.shutdown() } } object MessagingBenchmark extends App { def measureLatency(count: Int, f: () => Any): Double = { val start = System.nanoTime() for (i <- 1 to count) f() val end = System.nanoTime() (end - start).toDouble / count } val messageCount = 200000 for (stageCount <- List(2,4,8,16,32)) { printf("\n%d stages: \n", stageCount) val syncProcessor = new SyncThreadPoolProcessor(stageCount) val asyncProcessor = new AsyncThreadPoolProcessor(stageCount) val akkaProcessor = new AkkaProcessor(stageCount) printf("Sync: %8.0f ns\n", measureLatency(messageCount, syncProcessor.process)) printf("Async: %8.0f ns\n", measureLatency(messageCount, asyncProcessor.process)) printf("Akka: %8.0f ns\n", measureLatency(messageCount, akkaProcessor.process)) syncProcessor.shutdown() asyncProcessor.shutdown() akkaProcessor.shutdown() } } {noformat} > More-efficient ExecutorService for improved throughput > ------------------------------------------------------ > > Key: CASSANDRA-4718 > URL: https://issues.apache.org/jira/browse/CASSANDRA-4718 > Project: Cassandra > Issue Type: Improvement > Reporter: Jonathan Ellis > Priority: Minor > Attachments: baq vs trunk.png, PerThreadQueue.java > > > Currently all our execution stages dequeue tasks one at a time. This can > result in contention between producers and consumers (although we do our best > to minimize this by using LinkedBlockingQueue). > One approach to mitigating this would be to make consumer threads do more > work in "bulk" instead of just one task per dequeue. (Producer threads tend > to be single-task oriented by nature, so I don't see an equivalent > opportunity there.) > BlockingQueue has a drainTo(collection, int) method that would be perfect for > this. However, no ExecutorService in the jdk supports using drainTo, nor > could I google one. > What I would like to do here is create just such a beast and wire it into (at > least) the write and read stages. (Other possible candidates for such an > optimization, such as the CommitLog and OutboundTCPConnection, are not > ExecutorService-based and will need to be one-offs.) > AbstractExecutorService may be useful. The implementations of > ICommitLogExecutorService may also be useful. (Despite the name these are not > actual ExecutorServices, although they share the most important properties of > one.) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira