By putting there an .async you put there a buffer, too. See
http://doc.akka.io/docs/akka/2.4.5/scala/stream/stream-rate.html for
details.
On Thu, May 19, 2016 at 4:05 AM, Yang Yang wrote:
> hi ,i changed to async
>
> implicit lazy val system = ActorSystem("example")
>
hi ,i changed to async
implicit lazy val system = ActorSystem("example")
implicit val mat = ActorMaterializer()
val source = Source.repeat(1)
val temp = source.async.batch(5, it => new IntHolder(0))((all, other) => {
all.a += other; all
})
val end = temp.async.runForeach(it => {println(it);
Streames are fused by default right now so the entire pipeline is sleeping once
you did that.
You should try sprinkling an .async call around the fast (or slow) stages.
--
Konrad `ktoso` Malawski
Akka @ Lightbend
On 19 May 2016 at 03:18:45, Yang Yang (wjingyao2...@163.com) wrote:
hi , i got a
hi , i got a faster upstream, and a slow downstream,
so i want to "combine" the up-coming item into a arraybuffer, and then
consume the arraybuffer in a batch.
i find the Flow.batch seems can help me here,
but i write some test,which is not what i expect
*case class *IntHolder(*var