Re: [akka-user] how do i correctly use the Flow.batch?

2016-05-19 Thread Endre Varga
By putting there an .async you put there a buffer, too. See
http://doc.akka.io/docs/akka/2.4.5/scala/stream/stream-rate.html for
details.

On Thu, May 19, 2016 at 4:05 AM, Yang Yang  wrote:

> hi ,i changed to async
>
> implicit lazy val system = ActorSystem("example")
> implicit val mat = ActorMaterializer()
>
> val source = Source.repeat(1)
> val temp = source.async.batch(5, it => new IntHolder(0))((all, other) => {
>   all.a += other; all
> })
> val end = temp.async.runForeach(it => {println(it); Thread.sleep(2000)})
>
> Thread.sleep(5)
>
>
>
> but the output is still :
>
>
> IntHolder(0)
> IntHolder(0)
> IntHolder(0)
> IntHolder(0)
> IntHolder(0)
> IntHolder(0)
> IntHolder(0)
> IntHolder(0)
> IntHolder(0)
> IntHolder(0)
> IntHolder(4)
>
>
>
>  only very few of them are "combined", this sure is not what i want.
>
>
>is there any configuration i need to do ?
>
>
>
>
>
>
>
>> Streames are fused by default right now so the entire pipeline is
>> sleeping once you did that.
>> You should try sprinkling an .async call around the fast (or slow) stages.
>>
>> --
>> Konrad `ktoso` Malawski
>> Akka  @ Lightbend 
>>
>> On 19 May 2016 at 03:18:45, Yang Yang (wjingy...@163.com) wrote:
>>
>> hi , i got a faster upstream, and a slow downstream,
>>
>>  so  i want to "combine" the up-coming item into a arraybuffer, and then
>> consume the arraybuffer in a batch.
>>
>> i find the Flow.batch seems can help me here,
>>
>> but i write some test,which is not what i expect
>>
>>*case class* IntHolder(*var* a:Int=0)
>>
>>
>>   test(*"test simple batch"*) {
>> *implicit val *mat = *ActorMaterializer*()
>> *val *broadCastModule = *new *BroadCastModule[String](*system*)
>> *val *source = Source.*repeat*(1)
>>
>> *//val source = Source.actorRef[Int](1000, OverflowStrategy.dropHead)
>> **val *temp=source.batch(20,it=>*new 
>> *IntHolder(0))((all,other)=>{all.a+=other;all})
>>
>> *val *end=temp.runForeach(it=>{*println*(it);Thread.*sleep*(2000)})
>> Thread.*sleep*(5)
>>   }
>>
>>
>>  most output is:
>>
>> IntHolder(1)
>> IntHolder(1)
>> IntHolder(1)
>> IntHolder(1)
>> IntHolder(1)
>>
>> here is my code, the problem is the most of IntHolder is 1 inside
>>
>>
>>
>>  but i thread.sleep(2000) so i think the consumer is slow enough,
>>
>>
>>  i expect most of the println is IntHolder(20),because the consumer is 
>> slow,the producer will do batch for most item.
>>
>>
>>  what is wrong?  thanks
>>
>> --
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] how do i correctly use the Flow.batch?

2016-05-18 Thread Yang Yang
hi ,i changed to async

implicit lazy val system = ActorSystem("example")
implicit val mat = ActorMaterializer()

val source = Source.repeat(1)
val temp = source.async.batch(5, it => new IntHolder(0))((all, other) => {
  all.a += other; all
})
val end = temp.async.runForeach(it => {println(it); Thread.sleep(2000)})

Thread.sleep(5)



but the output is still :


IntHolder(0)
IntHolder(0)
IntHolder(0)
IntHolder(0)
IntHolder(0)
IntHolder(0)
IntHolder(0)
IntHolder(0)
IntHolder(0)
IntHolder(0)
IntHolder(4)



 only very few of them are "combined", this sure is not what i want.


   is there any configuration i need to do ?





 

> Streames are fused by default right now so the entire pipeline is sleeping 
> once you did that.
> You should try sprinkling an .async call around the fast (or slow) stages.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 19 May 2016 at 03:18:45, Yang Yang (wjingy...@163.com ) 
> wrote:
>
> hi , i got a faster upstream, and a slow downstream,  
>
>  so  i want to "combine" the up-coming item into a arraybuffer, and then 
> consume the arraybuffer in a batch.
>
> i find the Flow.batch seems can help me here,
>
> but i write some test,which is not what i expect
>
>*case class* IntHolder(*var* a:Int=0)
>
>
>   test(*"test simple batch"*) {
> *implicit val *mat = *ActorMaterializer*()
> *val *broadCastModule = *new *BroadCastModule[String](*system*)
> *val *source = Source.*repeat*(1)
>
> *//val source = Source.actorRef[Int](1000, OverflowStrategy.dropHead)
> **val *temp=source.batch(20,it=>*new 
> *IntHolder(0))((all,other)=>{all.a+=other;all})
>
> *val *end=temp.runForeach(it=>{*println*(it);Thread.*sleep*(2000)})
> Thread.*sleep*(5)
>   }
>
>
>  most output is:
>
> IntHolder(1)
> IntHolder(1)
> IntHolder(1)
> IntHolder(1)
> IntHolder(1)
>
> here is my code, the problem is the most of IntHolder is 1 inside
>
>
>  
>  but i thread.sleep(2000) so i think the consumer is slow enough,
>
>
>  i expect most of the println is IntHolder(20),because the consumer is 
> slow,the producer will do batch for most item.
>
>
>  what is wrong?  thanks 
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] how do i correctly use the Flow.batch?

2016-05-18 Thread Konrad Malawski
Streames are fused by default right now so the entire pipeline is sleeping once 
you did that.
You should try sprinkling an .async call around the fast (or slow) stages.

-- 
Konrad `ktoso` Malawski
Akka @ Lightbend

On 19 May 2016 at 03:18:45, Yang Yang (wjingyao2...@163.com) wrote:

hi , i got a faster upstream, and a slow downstream, 

 so  i want to "combine" the up-coming item into a arraybuffer, and then 
consume the arraybuffer in a batch.

i find the Flow.batch seems can help me here,

but i write some test,which is not what i expect

   case class IntHolder(var a:Int=0)

  test("test simple batch") {
implicit val mat = ActorMaterializer()
val broadCastModule = new BroadCastModule[String](system)
val source = Source.repeat(1)
//val source = Source.actorRef[Int](1000, OverflowStrategy.dropHead)
val temp=source.batch(20,it=>new 
IntHolder(0))((all,other)=>{all.a+=other;all})

val end=temp.runForeach(it=>{println(it);Thread.sleep(2000)})
Thread.sleep(5)
  }


most output is:
IntHolder(1)
IntHolder(1)
IntHolder(1)
IntHolder(1)
IntHolder(1)
here is my code, the problem is the most of IntHolder is 1 inside





but i thread.sleep(2000) so i think the consumer is slow enough,


i expect most of the println is IntHolder(20),because the consumer is slow,the 
producer will do batch for most item.


what is wrong?  thanks  
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] how do i correctly use the Flow.batch?

2016-05-18 Thread Yang Yang
hi , i got a faster upstream, and a slow downstream, 

 so  i want to "combine" the up-coming item into a arraybuffer, and then 
consume the arraybuffer in a batch.

i find the Flow.batch seems can help me here,

but i write some test,which is not what i expect

   *case class *IntHolder(*var *a:Int=0)


  test(*"test simple batch"*) {

*implicit val *mat = *ActorMaterializer*()

*val *broadCastModule = *new *BroadCastModule[String](*system*)

*val *source = Source.*repeat*(1)

*//val source = Source.actorRef[Int](1000, OverflowStrategy.dropHead)
**val *temp=source.batch(20,it=>*new 
*IntHolder(0))((all,other)=>{all.a+=other;all})

*val *end=temp.runForeach(it=>{*println*(it);Thread.*sleep*(2000)})

Thread.*sleep*(5)

  }


most output is:

IntHolder(1)
IntHolder(1)
IntHolder(1)
IntHolder(1)
IntHolder(1)

here is my code, the problem is the most of IntHolder is 1 inside



but i thread.sleep(2000) so i think the consumer is slow enough,


i expect most of the println is IntHolder(20),because the consumer is slow,the 
producer will do batch for most item.


what is wrong?  thanks 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.