Hi,
One thing to remember when using grouped is that it will _always_ wait for
100 elements (or whatever number is configured) except on stream close, so
if your input stream is an interactive source then some of the batches
might get delayed by a long time (until the 100 elements arrive).
I adde
>> def insertValues(rnd: String): Flow[Int, Boolean] = {
>> Flow[Int].mapAsyncUnordered(k => redis.set(k + rnd, message))
>> }
>> val maxSeq = 500
>> val seqSource = Source( () => (1 to maxSeq).iterator )
>>
>> *val streamWithSeqSource =
>> seqSource.via(insertValues(random1)).runWith
Endre, thank you again.
I think you are correct. It looks like the primary limitation is around not
being able to batch more operations in one network call (TCP).
I increased the message size (10 times) and I'm able to send more bytes per
second. At some point I'll hit the network limit.
The
On Tue, Dec 23, 2014 at 4:02 AM, Soumya Simanta
wrote:
> Another akka-streams back pressure related question in context of the
> following piece of code.
>
> def insertValues(rnd: String): Flow[Int, Boolean] = {
> Flow[Int].mapAsyncUnordered(k => redis.set(k + rnd, message))
> }
> val maxSe
Hi,
On Tue, Dec 23, 2014 at 12:40 AM, Soumya Simanta
wrote:
> Endre,
>
> Thank you for taking the time to explain everything. It was really helpful
> not only in understanding the streams basics but also to create a
> better/faster version of what I'm trying to do.
> Before I go any further I wa
Another akka-streams back pressure related question in context of the
following piece of code.
def insertValues(rnd: String): Flow[Int, Boolean] = {
Flow[Int].mapAsyncUnordered(k => redis.set(k + rnd, message))
}
val maxSeq = 500
val seqSource = Source( () => (1 to maxSeq).iterator )
Endre,
Thank you for taking the time to explain everything. It was really helpful
not only in understanding the streams basics but also to create a
better/faster version of what I'm trying to do.
Before I go any further I want to say that I love Akka streams and it is
going to be a useful API
Hi Soumya
First of all, the performance of Akka IO (the original actor based one)
might be slow or fast, but it does not degrade if writes are properly
backpressured. Also it does not use Futures at all, so I guess this is an
artifact of how you drive it.
Now your first reactive-streams approach
Looks like my akka-streams code was not doing back pressure. Not sure how I
can change it handle back pressure.
Then I changed my code to the following. I borrowed the code from one of
the Akka stream activator examples (WritePrimes). I added a buffer in
between that also helped significantly.
Here is my attempt to create a version with back pressure with Reactive
Stream. Not sure if it completely correct or not. Can someone please verify
if the code below is correct?
Even with this version I don't see any change is throughput and the network
IO graph looks very similar to what I had
Endre, thank you for responding. Following is what the author of Rediscala
has to say.
*"Yes i noticed it during my tests, at some point the scale is exponential
(bad).*
*I suspected the thread scheduler to be the limitation.Or the way
Future.sequence works.*
*If you can isolate a test that
Hi,
My personal guess is that since you don't obey any backpressure when you
start flooding the redis client with requests you end up with a lot of
queued messages and probably high GC pressure. You can easily test this by
looking at the memory profile of your test.
On Sat, Dec 20, 2014 at 6:55
I'm asking this question here because Rediscala uses Akka IO. So maybe
someone here could provide some insight into what's going on.
I'm trying to run a simple test by putting 1 million keys-values into Redis
(running on the same machine). For 100,000 keys it is really fast.
However, the pe
13 matches
Mail list logo