Re: [akka-user] Am I misunderstanding throttle?

2016-03-07 Thread Samuel Tardieu
If you throttle at a rate of 1 element every 2 seconds, you have likely
suffered the problem described in https://github.com/akka/akka/issues/19862
(summary: you can't throttle slower than 1 element per 1.074 second or a
division by zero will occur — and the first element will be delayed also).
Looking at the issue, it looks like a fix is being prepared and should be
available in Akka 2.4.3.

  Sam

Le ven. 4 mars 2016 à 06:50,  a écrit :

> Could this be a aka-streams bug? Should I open a git bug?
> In-between I am using my own  inferior throttle so I have a workaround
>
>
> final static class Tick {}
> public static  Flow createFlow(final long throttleSecs) {
> Flow sourceflow =  Flow.create();
>
> Flow, BoxedUnit> flow = Flow.fromGraph(
>  GraphDSL.create(builder -> {
> final FlowShape source = builder.add(sourceflow);
> Source tickSource = 
> Source.tick(FiniteDuration.apply(0, "millis"), 
> FiniteDuration.apply(throttleSecs, "millis"), new Tick());
> final FanInShape2> zipper = 
> builder.add(Zip.create());
> SourceShape tickSourceShape = builder.add(tickSource);
>
> builder.from(source).toInlet(zipper.in0());
> builder.from(tickSourceShape).toInlet(zipper.in1());
> return FlowShape.of(source.in(), zipper.out());
>  }));
>return flow.map(Pair::first);
>
>
> Am Mittwoch, 2. März 2016 15:12:00 UTC+1 schrieb john@gmail.com:
>>
>>
>> Hi Endre,
>> many thanks for wanting to help me!!
>>
>> If you run the code you will see that only "success : Pipeline 2"  gets
>> outputted to System.out.
>>
>> If I remove the Throttle everything works as expected.
>>
>> //Source pipeline = source.via(throttle).mapConcat(t -> 
>> t);
>> Source pipeline = source.mapConcat(t -> t);
>>
>>
>> Am I doing anything wrong or stupid?
>>
>> here is the code again:
>>
>> package  test
>>
>>
>>
>> import akka.actor.ActorSystem;
>> import akka.dispatch.Futures;
>> import akka.japi.function.Function;
>> import akka.stream.ActorMaterializer;
>> import akka.stream.ActorMaterializerSettings;
>> import akka.stream.Materializer;
>> import akka.stream.Supervision;
>> import akka.stream.javadsl.Flow;
>> import akka.stream.javadsl.Sink;
>> import akka.stream.javadsl.Source;
>> import scala.Option;
>> import scala.Tuple2;
>> import scala.concurrent.Promise;
>> import scala.concurrent.duration.FiniteDuration;
>> import scala.runtime.BoxedUnit;
>>
>> import java.util.ArrayList;
>> import java.util.Collection;
>>
>>
>> public class MailThrottleTest {
>>static ActorSystem system = ActorSystem.create("TestThrotteling");
>>
>>public static void main(String[] args) throws Exception{
>>   MailThrottleTest mailThrottleTest = new MailThrottleTest();
>>   Source  pipe1= 
>> mailThrottleTest.createPipeLine("Pipeline 1");
>>   Source  pipe2= 
>> mailThrottleTest.createPipeLine("Pipeline 2");
>>
>>
>>   final Function decider = exc -> {
>>  return Supervision.restart();
>>   };
>>   final Materializer mat1 = 
>> ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
>>  system);
>>   final Materializer mat2 = 
>> ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
>>  system);
>>   pipe1.to(Sink.foreach(object -> {
>>  System.out.println("Got Object pipe 1 : "+object);
>>   })).run(mat1);
>>
>>   pipe2.to(Sink.foreach(object -> {
>>  System.out.println("Got Object pipe 2 : "+object);
>>   })).run(mat2);
>>   system.awaitTermination();
>>}
>>
>>public  Source  createPipeLine(final String name) 
>> throws Exception{
>>
>>
>>   final Promise>>> promise = 
>> Futures.promise();
>>   Source, BoxedUnit> source = 
>> Source.unfoldAsync(null, p -> {
>>  System.out.println("success : "+name);
>>  return promise.future();
>>   });
>>
>>   Flow,Collection,BoxedUnit> throttle =
>> Flow.>create().throttle(1, 
>> FiniteDuration.apply(2000, "millis"),1,
>>   new akka.stream.ThrottleMode.Shaping$());
>>
>>   Source pipeline = source.via(throttle).mapConcat(t 
>> -> t);
>>
>>   promise.success(Option.apply(new Tuple2(null, new ArrayList(;
>>   return pipeline;
>>
>>}
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Am Mittwoch, 2. März 2016 13:43:21 UTC+1 schrieb drewhk:
>>>
>>> Hi John,
>>>
>>> Can you prepare a small reproducer? It might be a bug, but we can only
>>> be sure if we see some code that exhibits the behavior.
>>>
>>> -Endre
>>>
>>> On Wed, Mar 2, 2016 at 1:40 PM,  wrote:
>>>
 I am running up several instances of the an akka streams pipeline.

 In the pipeline there is a throttle stage (Flow.create.throttle(..))
 Now what is happing is that only the first pipeline works. The other 
 pipelines all stall.
 Any Ideas?
 Many Greetings
 John

 --
 >> Read the doc

Re: [akka-user] Am I misunderstanding throttle?

2016-03-04 Thread Endre Varga
Hi John,

I had no time to look into your problem yet since I and the team is on a
meeting this week. Please file a ticket and someone will pick it up next
week.

-Endre

On Fri, Mar 4, 2016 at 6:49 AM,  wrote:

> Could this be a aka-streams bug? Should I open a git bug?
> In-between I am using my own  inferior throttle so I have a workaround
>
>
> final static class Tick {}
> public static  Flow createFlow(final long throttleSecs) {
> Flow sourceflow =  Flow.create();
>
> Flow, BoxedUnit> flow = Flow.fromGraph(
>  GraphDSL.create(builder -> {
> final FlowShape source = builder.add(sourceflow);
> Source tickSource = 
> Source.tick(FiniteDuration.apply(0, "millis"), 
> FiniteDuration.apply(throttleSecs, "millis"), new Tick());
> final FanInShape2> zipper = 
> builder.add(Zip.create());
> SourceShape tickSourceShape = builder.add(tickSource);
>
> builder.from(source).toInlet(zipper.in0());
> builder.from(tickSourceShape).toInlet(zipper.in1());
> return FlowShape.of(source.in(), zipper.out());
>  }));
>return flow.map(Pair::first);
>
>
> Am Mittwoch, 2. März 2016 15:12:00 UTC+1 schrieb john@gmail.com:
>>
>>
>> Hi Endre,
>> many thanks for wanting to help me!!
>>
>> If you run the code you will see that only "success : Pipeline 2"  gets
>> outputted to System.out.
>>
>> If I remove the Throttle everything works as expected.
>>
>> //Source pipeline = source.via(throttle).mapConcat(t -> 
>> t);
>> Source pipeline = source.mapConcat(t -> t);
>>
>>
>> Am I doing anything wrong or stupid?
>>
>> here is the code again:
>>
>> package  test
>>
>>
>>
>> import akka.actor.ActorSystem;
>> import akka.dispatch.Futures;
>> import akka.japi.function.Function;
>> import akka.stream.ActorMaterializer;
>> import akka.stream.ActorMaterializerSettings;
>> import akka.stream.Materializer;
>> import akka.stream.Supervision;
>> import akka.stream.javadsl.Flow;
>> import akka.stream.javadsl.Sink;
>> import akka.stream.javadsl.Source;
>> import scala.Option;
>> import scala.Tuple2;
>> import scala.concurrent.Promise;
>> import scala.concurrent.duration.FiniteDuration;
>> import scala.runtime.BoxedUnit;
>>
>> import java.util.ArrayList;
>> import java.util.Collection;
>>
>>
>> public class MailThrottleTest {
>>static ActorSystem system = ActorSystem.create("TestThrotteling");
>>
>>public static void main(String[] args) throws Exception{
>>   MailThrottleTest mailThrottleTest = new MailThrottleTest();
>>   Source  pipe1= 
>> mailThrottleTest.createPipeLine("Pipeline 1");
>>   Source  pipe2= 
>> mailThrottleTest.createPipeLine("Pipeline 2");
>>
>>
>>   final Function decider = exc -> {
>>  return Supervision.restart();
>>   };
>>   final Materializer mat1 = 
>> ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
>>  system);
>>   final Materializer mat2 = 
>> ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
>>  system);
>>   pipe1.to(Sink.foreach(object -> {
>>  System.out.println("Got Object pipe 1 : "+object);
>>   })).run(mat1);
>>
>>   pipe2.to(Sink.foreach(object -> {
>>  System.out.println("Got Object pipe 2 : "+object);
>>   })).run(mat2);
>>   system.awaitTermination();
>>}
>>
>>public  Source  createPipeLine(final String name) 
>> throws Exception{
>>
>>
>>   final Promise>>> promise = 
>> Futures.promise();
>>   Source, BoxedUnit> source = 
>> Source.unfoldAsync(null, p -> {
>>  System.out.println("success : "+name);
>>  return promise.future();
>>   });
>>
>>   Flow,Collection,BoxedUnit> throttle =
>> Flow.>create().throttle(1, 
>> FiniteDuration.apply(2000, "millis"),1,
>>   new akka.stream.ThrottleMode.Shaping$());
>>
>>   Source pipeline = source.via(throttle).mapConcat(t 
>> -> t);
>>
>>   promise.success(Option.apply(new Tuple2(null, new ArrayList(;
>>   return pipeline;
>>
>>}
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Am Mittwoch, 2. März 2016 13:43:21 UTC+1 schrieb drewhk:
>>>
>>> Hi John,
>>>
>>> Can you prepare a small reproducer? It might be a bug, but we can only
>>> be sure if we see some code that exhibits the behavior.
>>>
>>> -Endre
>>>
>>> On Wed, Mar 2, 2016 at 1:40 PM,  wrote:
>>>
 I am running up several instances of the an akka streams pipeline.

 In the pipeline there is a throttle stage (Flow.create.throttle(..))
 Now what is happing is that only the first pipeline works. The other 
 pipelines all stall.
 Any Ideas?
 Many Greetings
 John

 --
 >> Read the docs: http://akka.io/docs/
 >> Check the FAQ:
 http://doc.akka.io/docs/akka/current/additional/faq.html
 >> Search the archives:
 https://groups.google.com/group/akka-user
 ---
 You received this

Re: [akka-user] Am I misunderstanding throttle?

2016-03-03 Thread john . vieten
Could this be a aka-streams bug? Should I open a git bug? 
In-between I am using my own  inferior throttle so I have a workaround


final static class Tick {}
public static  Flow createFlow(final long throttleSecs) {
Flow sourceflow =  Flow.create();
   
Flow, BoxedUnit> flow = Flow.fromGraph(
 GraphDSL.create(builder -> {
final FlowShape source = builder.add(sourceflow);
Source tickSource = 
Source.tick(FiniteDuration.apply(0, "millis"), 
FiniteDuration.apply(throttleSecs, "millis"), new Tick());
final FanInShape2> zipper = 
builder.add(Zip.create());
SourceShape tickSourceShape = builder.add(tickSource);

builder.from(source).toInlet(zipper.in0());
builder.from(tickSourceShape).toInlet(zipper.in1());
return FlowShape.of(source.in(), zipper.out());
 }));
   return flow.map(Pair::first);


Am Mittwoch, 2. März 2016 15:12:00 UTC+1 schrieb john@gmail.com:
>
>
> Hi Endre,
> many thanks for wanting to help me!!
>
> If you run the code you will see that only "success : Pipeline 2"  gets 
> outputted to System.out.
>
> If I remove the Throttle everything works as expected.
>
> //Source pipeline = source.via(throttle).mapConcat(t -> 
> t); 
> Source pipeline = source.mapConcat(t -> t);
>
>
> Am I doing anything wrong or stupid?
>
> here is the code again:
>
> package  test
>
>
>
> import akka.actor.ActorSystem;
> import akka.dispatch.Futures;
> import akka.japi.function.Function;
> import akka.stream.ActorMaterializer;
> import akka.stream.ActorMaterializerSettings;
> import akka.stream.Materializer;
> import akka.stream.Supervision;
> import akka.stream.javadsl.Flow;
> import akka.stream.javadsl.Sink;
> import akka.stream.javadsl.Source;
> import scala.Option;
> import scala.Tuple2;
> import scala.concurrent.Promise;
> import scala.concurrent.duration.FiniteDuration;
> import scala.runtime.BoxedUnit;
>
> import java.util.ArrayList;
> import java.util.Collection;
>
>
> public class MailThrottleTest {
>static ActorSystem system = ActorSystem.create("TestThrotteling");
>
>public static void main(String[] args) throws Exception{
>   MailThrottleTest mailThrottleTest = new MailThrottleTest();
>   Source  pipe1= 
> mailThrottleTest.createPipeLine("Pipeline 1");
>   Source  pipe2= 
> mailThrottleTest.createPipeLine("Pipeline 2");
>
>
>   final Function decider = exc -> {
>  return Supervision.restart();
>   };
>   final Materializer mat1 = 
> ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
>  system);
>   final Materializer mat2 = 
> ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
>  system);
>   pipe1.to(Sink.foreach(object -> {
>  System.out.println("Got Object pipe 1 : "+object);
>   })).run(mat1);
>
>   pipe2.to(Sink.foreach(object -> {
>  System.out.println("Got Object pipe 2 : "+object);
>   })).run(mat2);
>   system.awaitTermination();
>}
>
>public  Source  createPipeLine(final String name) 
> throws Exception{
>
>
>   final Promise>>> promise = 
> Futures.promise();
>   Source, BoxedUnit> source = Source.unfoldAsync(null, 
> p -> {
>  System.out.println("success : "+name);
>  return promise.future();
>   });
>
>   Flow,Collection,BoxedUnit> throttle =
> Flow.>create().throttle(1, 
> FiniteDuration.apply(2000, "millis"),1,
>   new akka.stream.ThrottleMode.Shaping$());
>
>   Source pipeline = source.via(throttle).mapConcat(t 
> -> t);
>
>   promise.success(Option.apply(new Tuple2(null, new ArrayList(;
>   return pipeline;
>
>}
>
>
> }
>
>
>
>
>
>
>
>
>
> Am Mittwoch, 2. März 2016 13:43:21 UTC+1 schrieb drewhk:
>>
>> Hi John,
>>
>> Can you prepare a small reproducer? It might be a bug, but we can only be 
>> sure if we see some code that exhibits the behavior.
>>
>> -Endre
>>
>> On Wed, Mar 2, 2016 at 1:40 PM,  wrote:
>>
>>> I am running up several instances of the an akka streams pipeline.
>>>
>>> In the pipeline there is a throttle stage (Flow.create.throttle(..))
>>> Now what is happing is that only the first pipeline works. The other 
>>> pipelines all stall.
>>> Any Ideas?
>>> Many Greetings
>>> John
>>>
>>> -- 
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://grou

Re: [akka-user] Am I misunderstanding throttle?

2016-03-02 Thread john . vieten

Hi Endre,
many thanks for wanting to help me!!

If you run the code you will see that only "success : Pipeline 2"  gets 
outputted to System.out.

If I remove the Throttle everything works as expected.

//Source pipeline = source.via(throttle).mapConcat(t -> t); 
Source pipeline = source.mapConcat(t -> t);


Am I doing anything wrong or stupid?

here is the code again:

package  test



import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.japi.function.Function;
import akka.stream.ActorMaterializer;
import akka.stream.ActorMaterializerSettings;
import akka.stream.Materializer;
import akka.stream.Supervision;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;

import java.util.ArrayList;
import java.util.Collection;


public class MailThrottleTest {
   static ActorSystem system = ActorSystem.create("TestThrotteling");

   public static void main(String[] args) throws Exception{
  MailThrottleTest mailThrottleTest = new MailThrottleTest();
  Source  pipe1= 
mailThrottleTest.createPipeLine("Pipeline 1");
  Source  pipe2= 
mailThrottleTest.createPipeLine("Pipeline 2");


  final Function decider = exc -> {
 return Supervision.restart();
  };
  final Materializer mat1 = 
ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
 system);
  final Materializer mat2 = 
ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
 system);
  pipe1.to(Sink.foreach(object -> {
 System.out.println("Got Object pipe 1 : "+object);
  })).run(mat1);

  pipe2.to(Sink.foreach(object -> {
 System.out.println("Got Object pipe 2 : "+object);
  })).run(mat2);
  system.awaitTermination();
   }

   public  Source  createPipeLine(final String name) throws 
Exception{


  final Promise>>> promise = 
Futures.promise();
  Source, BoxedUnit> source = Source.unfoldAsync(null, p 
-> {
 System.out.println("success : "+name);
 return promise.future();
  });

  Flow,Collection,BoxedUnit> throttle =
Flow.>create().throttle(1, 
FiniteDuration.apply(2000, "millis"),1,
  new akka.stream.ThrottleMode.Shaping$());

  Source pipeline = source.via(throttle).mapConcat(t -> 
t);

  promise.success(Option.apply(new Tuple2(null, new ArrayList(;
  return pipeline;

   }


}









Am Mittwoch, 2. März 2016 13:43:21 UTC+1 schrieb drewhk:
>
> Hi John,
>
> Can you prepare a small reproducer? It might be a bug, but we can only be 
> sure if we see some code that exhibits the behavior.
>
> -Endre
>
> On Wed, Mar 2, 2016 at 1:40 PM, > wrote:
>
>> I am running up several instances of the an akka streams pipeline.
>>
>> In the pipeline there is a throttle stage (Flow.create.throttle(..))
>> Now what is happing is that only the first pipeline works. The other 
>> pipelines all stall.
>> Any Ideas?
>> Many Greetings
>> John
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
package test.throttle;

import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.japi.function.Function;
import akka.stream.ActorMaterializer;
import akka.stream.ActorMaterializerSettings;
import akka.stream.Materializer;
import akka.stream.Supervision;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import 

Re: [akka-user] Am I misunderstanding throttle?

2016-03-02 Thread Endre Varga
Hi John,

Can you prepare a small reproducer? It might be a bug, but we can only be
sure if we see some code that exhibits the behavior.

-Endre

On Wed, Mar 2, 2016 at 1:40 PM,  wrote:

> I am running up several instances of the an akka streams pipeline.
>
> In the pipeline there is a throttle stage (Flow.create.throttle(..))
> Now what is happing is that only the first pipeline works. The other 
> pipelines all stall.
> Any Ideas?
> Many Greetings
> John
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Am I misunderstanding throttle?

2016-03-02 Thread john . vieten


I am running up several instances of the an akka streams pipeline.

In the pipeline there is a throttle stage (Flow.create.throttle(..))
Now what is happing is that only the first pipeline works. The other pipelines 
all stall.
Any Ideas?
Many Greetings
John

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.