Hi Alexey,

On Thu, Jan 22, 2015 at 12:31 PM, Alexey Romanchuk <
alexey.romanc...@gmail.com> wrote:

> It is exactly what I have tried to achieve! You guys did amazing work with
> all akka streams. Thanks! :)
>

You might be also interested in this ticket:
https://github.com/akka/akka/issues/16610

It is about a batcher element that is a custom DetachedStage that
aggregates elements while:
 - the downstream is backpressuring
 - a certain batch limit is not yet reached

There are currently the following alternatives:
 - grouped(N), but it always waits for N elements (except when stream
terminates)
 - groupedWithin(T), it always batches elements in the time-window T, but
has no cap, and always waits for that time even if downstream is ready to
consume
 - conflate, which always batches elements while downstream is
backpressuring, but has no capacity limit (never backpressures upstream)

The ticket I linked is to basically have a conflate variant that can
backpressure the upstream if batch size limit is reached (similar to what
buffer does).

-Endre


>
> четверг, 22 января 2015 г., 14:43:52 UTC+6 пользователь drewhk написал:
>>
>> Hi Alexey,
>>
>>
>>
>> On Thu, Jan 22, 2015 at 4:15 AM, Alexey Romanchuk <alexey.r...@gmail.com>
>> wrote:
>>
>>> Hey!
>>>
>>> I have a stream that process incoming messages, assemble big message
>>> "pack" and send it to other system via network. Incoming messages are
>>> relatively small and I use big buffers to improve throughput for all stages
>>> related to these small messages. At the very end of stream I have "message
>>> collector" which aggregates messages and periodically produces message
>>> "packs" which must be sent via network.
>>>
>>> System looks like this:
>>>
>>> (Input) -> (Complex flow for small messages) -> (Aggregator) -> (Network
>>> sender)
>>>
>>> I increased buffer size to 128 to improve throughput, but the problem is
>>> in network sender. Network sender can resend data several times in case of
>>> network failure. I want to force only 1 element buffer before network
>>> sender to perform effective backpressure to input. In case of same size
>>> buffer size stream can be flooded with lot of huge aggregators packages.
>>>
>>> I know that there is "buffer" flow combinator, but it looks like it work
>>> only if I want to increase buffer size before some stage, but does not work
>>> for decreasing buffer size.
>>>
>>
>> You can control the internal buffer size for stages as it is explained
>> here: http://doc.akka.io/docs/akka-stream-and-http-
>> experimental/1.0-M2/scala/stream-rate.html#Internal_
>> buffers_and_their_effect
>>
>>
>>    1. val flow =
>>    2. Flow[Int]
>>    3. .section(OperationAttributes.inputBuffer(initial = 1, max = 1)) {
>>    sectionFlow =>
>>    4. // the buffer size of this map is 1
>>    5. sectionFlow.map(_ * 2)
>>    6. }
>>    7. .map(_ / 2) // the buffer size of this map is the default
>>
>>
>> -Endre
>>
>>
>>>
>>> Could you advise right way to achieve this kind of backpressure?
>>>
>>> Thanks!
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>> current/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>> group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to