Hi,

One thing to remember when using grouped is that it will _always_ wait for
100 elements (or whatever number is configured) except on stream close, so
if your input stream is an interactive source then some of the batches
might get delayed by a long time (until the 100 elements arrive).

I added a ticket to write a simple batcher element recipe in the new
cookbook: https://github.com/akka/akka/issues/16610

-Endre



On Wed, Dec 24, 2014 at 12:27 AM, Soumya Simanta <soumya.sima...@gmail.com>
wrote:

>
>
>
>>> def insertValues(rnd: String): Flow[Int, Boolean] = {
>>>     Flow[Int].mapAsyncUnordered(k => redis.set(k + rnd, message))
>>>   }
>>> val maxSeq = 5000000
>>> val seqSource = Source( () => (1 to maxSeq).iterator )
>>>
>>> *val streamWithSeqSource =
>>> seqSource.via(insertValues(random1)).runWith(blackhole)*
>>>
>>> My understanding  is that the next request is send to Redis from the
>>> client only after a "single" Future is completed. Is this correct ?
>>>
>>
>> No, the number of allowed uncompleted Futures is defined by the buffer
>> size. If there wouldn't be a parallelization between Futures then there
>> would be no need for an ordered and unordered version of the same operation.
>>
>>
>
> Understood. So a map version will always be slower.
>
>
>
>
>> Is there a way I can batch a bunch of set requests and wait for them to
>>> be over before I can send a new batch ?
>>>
>>
>> If there would be a version of set that accepts a Seq[] of writes, let's
>> say "batchSet" then you could use:
>>
>> seqSource.grouped(100).mapAsyncUnordered(ks => redis.batchSet(...))
>>
>> Where grouped makes maximum 100 sized groups from the stream of elements
>> resulting in a stream of sequences. You need API support for that from the
>> Redis client though.
>>
>
> Yeah I tried that. Here is the code and the network IO. Throughput is
> better, of course at the cost of latency. I've not figured out a way to
> measure latency. Once I've a reliable way of doing so I can figure out what
> the difference in latency is.
>
>   seqSource.grouped(100).mapAsyncUnordered { grp => {
>     val tran = redis.transaction()
>     for (i <- grp) yield {
>       tran.set(i + random2, message)
>     }
>     tran.exec()
>   }
>   }.runWith(blackhole)
>
>
> <https://lh6.googleusercontent.com/-uheE7cqhSgQ/VJn6arTCdxI/AAAAAAAAvaY/hNZgyozl1JE/s1600/5million_1k_messages_grouped100.png>
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to