Endre, 

Thank you for taking the time to explain everything. It was really helpful 
not only in understanding the streams basics but also to create a 
better/faster version of what I'm trying to do. 
Before I go any further I want to say that I love Akka streams and it is 
going to be a useful API for a lot of my future work. Thanks to the Akka 
team. 

I tweaked both the dispatchers settings as well as the type of dispatcher 
used by default dispatcher. The program still ends up taking a good deal of 
my CPU (NOTE: The screenshot below is for FJP and not a ThreadpoolExecutor 
but I see similar usage with TPE). The memory footprint is always under 
control as excepted. I gave 12G of heap space to the JVM. 
The frequency of young generation GC depends on the MaterializerSettings 
buffer sizes. I've not tweaked the GC yet. Do you think that can make a 
difference ? 

BTW, does the a size of 64 mean that there will be 64 items in each buffer 
in the pipeline. I bumped it to 512 and saw an increase in throughput. 

Here is the configuration and screenshots of one of the better runs I had. 
I'm not sure if I'm limited by TCP or how Rediscala is using Akka IO at 
this point. 
Any further insights will be very useful and appreciated. In the mean time 
I'll continue to play around with different values. 

Thanks again !  

My machine config is 

*Darwin 14.0.0 Darwin Kernel Version 14.0.0: Fri Sep 19 00:26:44 PDT 2014; 
root:xnu-2782.1.97~2/RELEASE_X86_64 x86_64*

  Processor Name: Intel Core i7 

  Processor Speed: 2.6 GHz

  Number of Processors: 1

  Total Number of Cores: 4

  L2 Cache (per Core): 256 KB

  L3 Cache: 6 MB

  Memory: 16 GB

*application.conf *

rediscala {
    rediscala-client-worker-dispatcher {
    mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
    throughput = 1000
  }
}

actor {
  default-dispatcher {

    type = "Dispatcher"
    executor = "fork-join-executor"
    default-executor {
      fallback = "fork-join-executor"
    }

    # This will be used if you have set "executor = "fork-join-executor""
    fork-join-executor {
      # Min number of threads to cap factor-based parallelism number to
      parallelism-min = 5

      # Max number of threads to cap factor-based parallelism number to
      parallelism-max = 5
    }
    throughput = 1000
  }
}

I'm using the following for the FlowMaterializer

 val settings = MaterializerSettings(system)
 implicit val materializer = 
FlowMaterializer(settings.copy(maxInputBufferSize = *512*, 
initialInputBufferSize = *512*))

<https://lh6.googleusercontent.com/-gLBJ7tgfRN4/VJipabIoLgI/AAAAAAAAvTw/9DdnDszQ55o/s1600/rediscala_network_IO_5Million_backpressure.png>

<https://lh4.googleusercontent.com/-USKroaRYgco/VJipgfoSIOI/AAAAAAAAvT4/nUU-y9BCRXs/s1600/rediscala_network_IO_5Million_backpressure_threads.png>

<https://lh6.googleusercontent.com/-w7_OA9C5f7k/VJipl0YW8-I/AAAAAAAAvUA/wpoDmD0F9xM/s1600/rediscala_network_IO_5Million_backpressure_cpu_memory.png>


On Monday, December 22, 2014 3:56:30 AM UTC-5, Akka Team wrote:
>
> Hi Soumya
>
> First of all, the performance of Akka IO (the original actor based one) 
> might be slow or fast, but it does not degrade if writes are properly 
> backpressured. Also it does not use Futures at all, so I guess this is an 
> artifact of how you drive it.
>
> Now your first reactive-streams approach didn't work because the "map" 
> stage that created an actor already let in the next element as soon as the 
> actor was created. What you want is to let in the next element after the 
> elements has been written. In other words you just created an ever growing 
> number of actors without waiting for the write to complete.
>
> Your second solution is correct though because mapAsyncUnordered only lets 
> in the next elements when the passed future completes -- which in your case 
> corresponds to a finished write. 
>
> As for the CPU usage, without the actual profile it doesn't say too much. 
> For example if your ForkJoinPool (the default dispatcher) has not much work 
> to do, it will spend a lot of time in its work stealing cycle (scan()) 
> because there is no work to steal. This is purely an artifact of that pool 
> and has nothing to to with actual CPU usage of the application. You can try 
> with an Executor based pool if you want to test this.
>
> If you really want to play around to see how much throughput is possible, 
> you should try the following approaches step by step:
>
>  - Increase the throughput setting of the dispatcher that executes the 
> stream and redis client. You can try values from 100 to even 1000
>  - Increase the default buffer size of the stream materializer (you can 
> pass a MaterializerSettings object). You should try buffer sizes of 32, 64, 
> 128.
>
> Btw, streams are currently not optimized at all, so don't get overly high 
> expectations yet :)
>
> -Endre
>
> On Mon, Dec 22, 2014 at 4:53 AM, Soumya Simanta <soumya....@gmail.com 
> <javascript:>> wrote:
>
>> Looks like my akka-streams code was not doing back pressure. Not sure how 
>> I can change it handle back pressure. 
>>
>> Then I changed my code to the following. I borrowed the code from one of 
>> the Akka stream activator examples (WritePrimes). I added a buffer in 
>> between that also helped significantly. 
>>
>>
>>   val maxRandomNumberSize = 1000000
>> *  val randomSource = Source(() => 
>> Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize)))*
>>
>>   def insertValues : Flow[Int,Boolean] = {
>>     Flow[Int].mapAsyncUnordered(k => redis.set(k + random, message))
>>   }
>>
>>   val blackhole = BlackholeSink
>>
>>   //val stream  = source.via(insertValues).runWith(blackhole) //No buffer
>>   val streamWithRandomSource = randomSource.*buffer(20000, 
>> OverflowStrategy.backpressure)*.via(insertValues).runWith(blackhole)
>>
>> Not the network IO looks much more uniform. It's a pleasure to see back 
>> pressure work (visually) :-)
>>
>>
>> <https://lh5.googleusercontent.com/-wpEVIqgvT3k/VJeVkEsl50I/AAAAAAAAvTc/ZGlyonJbof8/s1600/rediscala_network_io_1actor_backpressure.png>
>>
>>
>> I did see my CPU usage bump up in this version. Any reason why ? 
>>
>>
>> On Sunday, December 21, 2014 11:55:01 AM UTC-5, Soumya Simanta wrote:
>>>
>>> Here is my attempt to create a version with back pressure with Reactive 
>>> Stream. Not sure if it completely correct or not. Can someone please verify 
>>> if the code below is correct? 
>>> Even with this version I don't see any change is throughput and the 
>>> network IO graph looks very similar to what I had without using reactive 
>>> streams. 
>>>
>>> On the other hand if I use 100 Rediscala client actors the inserts of 
>>> much faster. I understand that now there are 100 queues (mailboxes) and 
>>> therefore its faster. But I still don't understand why the performance is 
>>> so bad for a single client after a certain threshold, even after using back 
>>> pressure (assuming I'm using Akka streams correctly).
>>>
>>>
>>> *Code with Akka streams and one Rediscala client. *
>>>
>>> import java.util.UUID
>>>
>>> import akka.actor.ActorSystem
>>> import akka.stream.FlowMaterializer
>>> import akka.stream.scaladsl.Source
>>> import akka.util.ByteString
>>> import redis.RedisClient
>>>
>>> object RedisStreamClient extends App {
>>>
>>>   val message = """How to explain ZeroMQ? Some of us start by saying all 
>>> the wonderful things it does. It's sockets on steroids. It's like mailboxes 
>>> with routing. It's fast! Others try to share their moment of enlightenment, 
>>> that zap-pow-kaboom satori paradigm-shift moment when it all became 
>>> obvious. Things just become simpler. Complexity goes away. It opens the 
>>> mind. Others try to explain by comparison. It's smaller, simpler, but still 
>>> looks familiar. Personally, I like to remember why we made ZeroMQ at all, 
>>> because that's most likely where you, the reader, still are today.How to 
>>> explain ZeroMQ? Some of us start by saying all the wonderful things it 
>>> does. It's sockets on steroids. It's like mailboxes with routing. It's 
>>> fast! Others try to share their moment of enlightenment, that 
>>> zap-pow-kaboom satori paradigm-shift moment when it all became obvious. 
>>> Things just become simpler. Complexity goes away. It opens the mind. Others 
>>> try to explain by comparison. It's smaller, simpler, but still looks 
>>> familiar. Personally, I like to remember why we made ZeroMQ at all, because 
>>> that's most likely where"""
>>>
>>>   implicit val system = ActorSystem("Sys")
>>>
>>>   implicit val materializer = FlowMaterializer()
>>>
>>>   val msgSize = message.getBytes.size
>>>
>>>   val redis = RedisClient()
>>>   implicit val ec = redis.executionContext
>>>   val random = UUID.randomUUID().toString
>>>
>>>   val source = Source( () => (1 to 1000000).iterator )
>>>   source.map{  x => x + 1 }.foreach( x => redis.set(random+x.toString, 
>>> ByteString(message) ) ).onComplete( _ => system.shutdown())
>>>
>>> }
>>>
>>>
>>> <https://lh4.googleusercontent.com/-5aG6CMqBLcA/VJb7J3qqdVI/AAAAAAAAvTE/kkdPVBT6AbQ/s1600/rediscala_network_IO.png>
>>>
>>>
>>>
>>> *Code for with 100 Rediscala clients.*
>>>
>>> import akka.actor.{ActorLogging, Props, Actor}
>>> import akka.util.ByteString
>>> import redisbenchmark.RedisBenchmarkActor.InsertValues
>>>
>>> import scala.concurrent.duration.Duration
>>> import scala.concurrent.{Await, Future}
>>> import redis.RedisClient
>>>
>>> import java.util.UUID
>>>
>>>
>>> object RedisLocalPerfMultipleActors {
>>>
>>>
>>>   def main(args: Array[String]) : Unit = {
>>>     implicit val akkaSystem = akka.actor.ActorSystem()
>>>     //create 100 RedisClient actors
>>>     val actors = 1 to 100
>>>     actors.map{ x => akkaSystem.actorOf(Props(new 
>>> RedisBenchmarkActor(10000)), "actor"+x) }.map{ actor => actor ! 
>>> InsertValues}
>>>
>>>     //TODO shutdown the actor system
>>> *    //Not sure how to wait for all Futures to complete before shutting 
>>> down the actor system*
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>> class RedisBenchmarkActor(runs: Int) extends Actor with ActorLogging {
>>>
>>>   val redis = RedisClient()
>>>   //implicit val ec = redis.executionContext
>>>   log.info(s"Actor created with $runs ")
>>>
>>>   def receive = {
>>>
>>>     case InsertValues => {
>>>
>>>       log.info("Inserting values ")
>>>
>>>       val random = UUID.randomUUID().toString
>>>       val start = System.currentTimeMillis()
>>>       val result: Seq[Future[Boolean]] = for {i <- 1 to runs} yield {
>>>         //log.info("sending values ....")
>>>         redis.set(random + i.toString, ByteString(
>>> RedisBenchmarkActor.message))
>>>       }
>>>
>>>     }
>>>   }
>>>
>>> }
>>>
>>>
>>> object RedisBenchmarkActor {
>>>
>>>   object InsertValues
>>>
>>>   val message = """How to explain ZeroMQ? Some of us start by saying all 
>>> the wonderful things it does. It's sockets on steroids. It's like mailboxes 
>>> with routing. It's fast! Others try to share their moment of enlightenment, 
>>> that zap-pow-kaboom satori paradigm-shift moment when it all became 
>>> obvious. Things just become simpler. Complexity goes away. It opens the 
>>> mind. Others try to explain by comparison. It's smaller, simpler, but still 
>>> looks familiar. Personally, I like to remember why we made ZeroMQ at all, 
>>> because that's most likely where you, the reader, still are today.How to 
>>> explain ZeroMQ? Some of us start by saying all the wonderful things it 
>>> does. It's sockets on steroids. It's like mailboxes with routing. It's 
>>> fast! Others try to share their moment of enlightenment, that 
>>> zap-pow-kaboom satori paradigm-shift moment when it all became obvious. 
>>> Things just become simpler. Complexity goes away. It opens the mind. Others 
>>> try to explain by comparison. It's smaller, simpler, but still looks 
>>> familiar. Personally, I like to remember why we made ZeroMQ at all, because 
>>> that's most likely where"""
>>>
>>> }
>>>
>>>
>>> <https://lh6.googleusercontent.com/-3ymr3gm5E9U/VJb7RnuFuGI/AAAAAAAAvTM/qKDLpLMvlF4/s1600/rediscala_network_io_100actors.png>
>>>
>>>
>>> On Saturday, December 20, 2014 9:07:53 AM UTC-5, Soumya Simanta wrote:
>>>>
>>>> Endre, thank you for responding. Following is what the author of 
>>>> Rediscala has to say. 
>>>>
>>>> *"Yes i noticed it during my tests, at some point the scale is 
>>>> exponential (bad).*
>>>>
>>>>
>>>> *I suspected the thread scheduler to be the limitation.Or the way 
>>>> Future.sequence works.*
>>>>
>>>> *If you can isolate a test that scale linearly up to 1M of futures, I 
>>>> would be interested to see it. By replacing akka-io with another java.nio 
>>>> library (xnio) I was able to pass the 1M req (at the speed of around 500k 
>>>> req/s)" *
>>>>
>>>> https://github.com/etaty/rediscala/issues/67
>>>>
>>>> If replacing akka-io with java.nio resolves this then either akka-io is 
>>>> not used correctly in Rediscala OR it is a fundamental limitation of 
>>>> akka-io. 
>>>>
>>>> My other responses inline. 
>>>>
>>>>
>>>> On Saturday, December 20, 2014 6:35:22 AM UTC-5, Akka Team wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> My personal guess is that since you don't obey any backpressure when 
>>>>> you start flooding the redis client with requests you end up with a lot 
>>>>> of 
>>>>> queued messages and probably high GC pressure. You can easily test this 
>>>>> by 
>>>>> looking at the memory profile of your test.
>>>>>
>>>>>
>>>> Yes, the memory pressure is indeed high. The young generation (Edge) 
>>>> space fills up very quickly and then a minor GC is kicked off. 
>>>> Can I use akka-streams to resolve and add backpressure here? Any 
>>>> pointers here will be greatly appreciated. 
>>>>  
>>>>
>>>>>
>>>>>
>>>>> On Sat, Dec 20, 2014 at 6:55 AM, Soumya Simanta <soumya....@gmail.com> 
>>>>> wrote:
>>>>>
>>>>>> val res: Future[List[Boolean]] = Future.sequence(result.toList) val 
>>>>>> end = System.currentTimeMillis() val diff = (end - start) println(s"for 
>>>>>> msgSize $msgSize and numOfRuns [$numberRuns] time is $diff ms ") 
>>>>>> akkaSystem.shutdown() 
>>>>>>
>>>>>> }
>>>>>>
>>>>> What does the above code intend to measure? Didn't you want to 
>>>>> actually wait on the "res" future?
>>>>>
>>>>> Yes, you are correct again. I should be waiting on res in order to get 
>>>> an estimate of overall latency. 
>>>>  
>>>>
>>>  -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com <javascript:>.
>> To post to this group, send email to akka...@googlegroups.com 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Akka Team
> Typesafe - The software stack for applications that scale
> Blog: letitcrash.com
> Twitter: @akkateam
>  

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to