Hi Wolfgang,
Well my real use case is a little bit more complex and correctly I do have > some performance problems. > This would be probably not solved by a custom schedule at all. In fact allowing custom schedules would completely deprive us from the flexibility we need internally to do optimizations later. As for performance, we tried to be very clear in our M1 announcement that we are not yet there. This is just an early milestone release and our aim was to have an API that is more or less stable. That said, there are various things you can try. If your goal is throughput I recommend the following settings: - change the dispatcher running the stream to have a throughput setting of 1000 - increase the default queue size for the materializer to 128: val materializer = FlowMaterializer( MaterializerSettings(system) .withInputBuffer( initialSize = 128, maxSize = 128)) - if you are adventurous and want to play around with completely internal, dangerous, unsupported stuff, then you can also try: import akka.stream.impl.{Optimizations, ActorBasedFlowMaterializer} val materializer = FlowMaterializer( MaterializerSettings(system) .withInputBuffer( initialSize = 128, maxSize = 128)).asInstanceOf[ActorBasedFlowMaterializer].copy(optimizations = Optimizations.all) I warn you though that the above code relies on internal API that we might change, eliminate, or just break silently :) The above settings punch trough happily >2.5M elements through a chain of 10 maps in one second on my 6 core machine. Also, be aware that since the whole chain is backpressured if your downstream consumers are not fast enough they will slow down everything above. -Endre > I thought I could improve the performance by changing the relation between > Command (Request messages) and Data messages. So far this relation is 1:2 > (at least this is what my tests are telling my) which is some overhead > especially as I do connect to Flows via "Bridge". > > Regards > > Wolfgang > > > > Am Montag, 22. Dezember 2014 13:32:56 UTC+1 schrieb Akka Team: >> >> Hi Wolfgang, >> >> >> On Mon, Dec 22, 2014 at 1:10 PM, Wolfgang Friedl <wolfgan...@hotmail.com> >> wrote: >> >>> Hi Endre! >>> >>> Thanks for the clarification, it does make sense :) >>> Is it possible to define the "batching strategy" for simple operations >>> like map? Or do I have to write a dedicated "processor" for handling this. >>> >> >> The question is why do you want to do that? That is purely internal >> stuff. Just like you don't want to inject your own congestion avoidance >> algorithm into TCP (ok, unless under very special circumstances) you should >> not really care about Akka Streams internal scheduling. For example we have >> one important optimization that is already there but not exposed to users >> which can fuse multiple maps (just one example) into one actor instead of >> running each of them in separate actors. Once fused their execution becomes >> synchronous and the batching strategy would no longer make sense anyway. >> >> What you might care about in certain situations are internal implicit >> buffer sizes which you can configure to have a maximum value. The usual >> rule here, if you are ever *semantically* affected by internal buffer sizes >> (this happens for certain time based zipping streams for example), then >> basically you want a buffer of 1. In every other case you want a buffer >> large enough to give you your desired throughput. (The part of the doc >> explaining this is being written right now: https://github.com/akka/akka/ >> pull/16596) >> >> -Endre >> >> >>> >>> Wolfgang >>> >>> Am Montag, 22. Dezember 2014 12:30:15 UTC+1 schrieb Akka Team: >>>> >>>> Hi Wolfgang, >>>> >>>> What you see is completely expected. The strategy you added only >>>> defines the strategy for that particular consumer. Once you attach other >>>> processing steps, like those "map"s they do their own batching strategy >>>> independent from what your consumer does. This is by design, backpressure >>>> scheduling strategies are not transitive (and should never be actually). >>>> >>>> You can imagine your stages like freely combinable elements: >>>> >>>> [ActorPublisher] >>>> [--> (map buffer+schedule) --> map] >>>> [--> (buffer+WatermarkStrategy) --> yourActor] >>>> >>>> When you assemble them like this: >>>> >>>> [ActorPublisher] [--> (buffer+WatermarkStrategy) --> yourActor] >>>> >>>> then of course your publisher will observe the schedule of your >>>> choosing. But if you assemble: >>>> >>>> [ActorPublisher] [--> (map buffer+schedule) --> map] [--> >>>> (buffer+WatermarkStrategy) --> yourActor] >>>> >>>> Then obviously your publisher will observe the schedule of the map >>>> directly, while the map will observe the schedule of your consumer. The >>>> backpressure protocol is always local between a producer-consumer pair. >>>> >>>> -Endre >>>> >>>> On Mon, Dec 22, 2014 at 11:59 AM, Wolfgang Friedl < >>>> wolfgan...@hotmail.com> wrote: >>>> >>>>> >>>>> Following Sample: >>>>> >>>>> The "Producer" receives a different demand of elements dependent if >>>>> the flow does have processing steps or not. >>>>> (*Consumer uses the WatermarkStrategy(30)*) >>>>> >>>>> >>>>> var flow = Flow.empty[Int] >>>>> flow.runWith(PublisherSource(ActorPublisher[Int](intProduce >>>>> r)),SubscriberSink(ActorSubscriber[Int](intConsumer))) >>>>> >>>>> -->Requested Demand in the Producer >>>>> >>>>> *30* var flow = Flow.empty[Int] >>>>> flow = flow.map(_ +1).map(_ *2) >>>>> flow.runWith(PublisherSource(ActorPublisher[Int](intProduce >>>>> r)),SubscriberSink(ActorSubscriber[Int](intConsumer))) >>>>> >>>>> -->Requested Demand in the Producer *4* >>>>> >>>>> >>>>> >>>>> >>>>> (*Consumer uses the OneByOneStrategy*) >>>>> >>>>> >>>>> var flow = Flow.empty[Int] >>>>> flow = flow.map(_ +1).map(_ *2) >>>>> flow.runWith(PublisherSource(ActorPublisher[Int](intProduce >>>>> r)),SubscriberSink(ActorSubscriber[Int](intConsumer))) >>>>> >>>>> -->Requested Demand in the Producer >>>>> >>>>> *4* var flow = Flow.empty[Int] >>>>> flow.runWith(PublisherSource(ActorPublisher[Int](intProduce >>>>> r)),SubscriberSink(ActorSubscriber[Int](intConsumer))) >>>>> >>>>> -->Requested Demand in the Producer *1* >>>>> >>>>> >>>>> I would expect that the "Request of demand" send to producers does not >>>>> depend on the steps of the flow? Or I'm a wrong. Moreover 4 seems somehow >>>>> strange to me. >>>>> Any help welcome. >>>>> >>>>> >>>>> Regards >>>>> >>>>> Wolfgang >>>>> Wolfgang >>>>> >>>>> -- >>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c >>>>> urrent/additional/faq.html >>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou >>>>> p/akka-user >>>>> --- >>>>> You received this message because you are subscribed to the Google >>>>> Groups "Akka User List" group. >>>>> To unsubscribe from this group and stop receiving emails from it, send >>>>> an email to akka-user+...@googlegroups.com. >>>>> To post to this group, send email to akka...@googlegroups.com. >>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>> For more options, visit https://groups.google.com/d/optout. >>>>> >>>> >>>> >>>> >>>> -- >>>> Akka Team >>>> Typesafe - The software stack for applications that scale >>>> Blog: letitcrash.com >>>> Twitter: @akkateam >>>> >>> -- >>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/ >>> current/additional/faq.html >>> >>>>>>>>>> Search the archives: https://groups.google.com/ >>> group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to akka-user+...@googlegroups.com. >>> To post to this group, send email to akka...@googlegroups.com. >>> Visit this group at http://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> >> >> -- >> Akka Team >> Typesafe - The software stack for applications that scale >> Blog: letitcrash.com >> Twitter: @akkateam >> > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to akka-user+unsubscr...@googlegroups.com. > To post to this group, send email to akka-user@googlegroups.com. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Akka Team Typesafe - The software stack for applications that scale Blog: letitcrash.com Twitter: @akkateam -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.