Re: [akka-user] Akka HTTP client overflow strategy

2016-05-27 Thread Samuel Tardieu
Wouldn't a circuit breaker allow you to drop new queries early if the
service takes too long to answer so that they don't accumulate?

http://doc.akka.io/docs/akka/2.4.6/common/circuitbreaker.html

  Sam

Le mer. 25 mai 2016 à 11:08, Loïc Descotte  a
écrit :

> HI,
>
> I would like to use Akka Http client to call a webservice in a non
> blocking way.
> The app that will use this client must never block and should never crash,
> even if the webservice that it will call becomes very slow (for example if
> the traffic is too high).
> When the client registers a callback, I guess it's memorized somewhere, so
> what is happening if a lot of queries and callbacks are kept in memory
> because the webservice takes too much time to respond?
> I guess it can cause some memory issues, so is it possible to cancel and
> drop some queries when this happens?
>
> Thanks :)
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to stop an Actor using akka-http's actor-per-request example without getting an Abrupt Termination?

2016-03-15 Thread Samuel Tardieu
I think this is a bug: https://github.com/akka/akka/issues/20032

Le mar. 15 mars 2016 à 03:57, Sam Smoot <ssm...@gmail.com> a écrit :

> That fixed it! I can't thank you enough!
>
>
> On Monday, March 14, 2016 at 5:43:05 PM UTC-5, Samuel Tardieu wrote:
>
>> Could it be that since you define an implicit Materializer in your actor
>> which is tied to the implicit ActorRefFactory (which would be context
>> inside the actor), it is the one used for singleRequest which requires
>> an implicit Materializer? Can you try defining your Materializer as
>> ActorMaterializer()(context.system) and see if it changes the outcome?
>> ​
>>
>>
>> Le lun. 14 mars 2016 à 18:36, Sam Smoot <ssm...@gmail.com> a écrit :
>>
> So if you look at the example in the docs:
>>> http://doc.akka.io/docs/akka/2.4.2/scala/http/client-side/request-level.html#Using_the_Future-Based_API_in_Actors
>>>
>>> That works fine. Except it's incomplete since it's obviously a
>>> single-use Actor (the request is fired off in preStart) and it doesn't show
>>> how to clean it up.
>>>
>>> I'm trying to implement this in my project, but sending context.self !
>>> PoisonPill to my Actor after consuming the response will reliably cause an
>>> AbruptTerminationException.
>>>
>>> I've whipped up a gist here that hopefully demonstrates the issue (with
>>> lots of comments): https://gist.github.com/sam/7731f883a62b329c6592
>>>
>>> Any help? I got a suggestion it might have something to do with the
>>> connection pool, but I'm not sure how to handle that. Should I pass the
>>> pool into my actor's constructor? Where do I get the pool? How do I pass it
>>> to singleRequest, which appears to take a ConnectionPoolSettings object?
>>>
>>> Thanks, -Sam
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>>
>> To unsubscribe from this group and stop receiving emails from it, send an
>>> email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>
>>
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to stop an Actor using akka-http's actor-per-request example without getting an Abrupt Termination?

2016-03-14 Thread Samuel Tardieu
Could it be that since you define an implicit Materializer in your actor
which is tied to the implicit ActorRefFactory (which would be context
inside the actor), it is the one used for singleRequest which requires an
implicit Materializer? Can you try defining your Materializer as
ActorMaterializer()(context.system) and see if it changes the outcome?
​


Le lun. 14 mars 2016 à 18:36, Sam Smoot  a écrit :

> So if you look at the example in the docs:
> http://doc.akka.io/docs/akka/2.4.2/scala/http/client-side/request-level.html#Using_the_Future-Based_API_in_Actors
>
> That works fine. Except it's incomplete since it's obviously a single-use
> Actor (the request is fired off in preStart) and it doesn't show how to
> clean it up.
>
> I'm trying to implement this in my project, but sending context.self !
> PoisonPill to my Actor after consuming the response will reliably cause an
> AbruptTerminationException.
>
> I've whipped up a gist here that hopefully demonstrates the issue (with
> lots of comments): https://gist.github.com/sam/7731f883a62b329c6592
>
> Any help? I got a suggestion it might have something to do with the
> connection pool, but I'm not sure how to handle that. Should I pass the
> pool into my actor's constructor? Where do I get the pool? How do I pass it
> to singleRequest, which appears to take a ConnectionPoolSettings object?
>
> Thanks, -Sam
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to stop an Actor using akka-http's actor-per-request example without getting an Abrupt Termination?

2016-03-14 Thread Samuel Tardieu
As a side node: self is already defined as context.self in Actor, you can
use that directly.
​

Le lun. 14 mars 2016 à 18:36, Sam Smoot  a écrit :

> So if you look at the example in the docs:
> http://doc.akka.io/docs/akka/2.4.2/scala/http/client-side/request-level.html#Using_the_Future-Based_API_in_Actors
>
> That works fine. Except it's incomplete since it's obviously a single-use
> Actor (the request is fired off in preStart) and it doesn't show how to
> clean it up.
>
> I'm trying to implement this in my project, but sending context.self !
> PoisonPill to my Actor after consuming the response will reliably cause an
> AbruptTerminationException.
>
> I've whipped up a gist here that hopefully demonstrates the issue (with
> lots of comments): https://gist.github.com/sam/7731f883a62b329c6592
>
> Any help? I got a suggestion it might have something to do with the
> connection pool, but I'm not sure how to handle that. Should I pass the
> pool into my actor's constructor? Where do I get the pool? How do I pass it
> to singleRequest, which appears to take a ConnectionPoolSettings object?
>
> Thanks, -Sam
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Using ActorPublisher without ActorSubscriber

2016-03-10 Thread Samuel Tardieu
Simão,

you might be trying to push too many values downstream, I think you should
replace your Math.max by Math.min for a start. Are you sure your
subscription isn’t canceled rapidly because of this force-feeding? Also,
have you tried logging your Request branch in receive and display
totalDemand then?


  Sam
​

Le jeu. 10 mars 2016 à 08:53, Simão Mata  a écrit :

> Here is the code for my actor:
> https://gist.github.com/simao/3003ee256751cfb7b2f7
>
> If i put a log message on `deliverBuf` then I see that this method gets
> called for each event, but totalDemand is always 0.
>
> Thank you for your help.
>
> Simao
>
>
>
>
> On Wednesday, March 9, 2016 at 10:41:13 PM UTC+1, Rafał Krzewski wrote:
>>
>> Your actor is supposed to receive ActorPublisherMessage.Request message
>> after the stream is materialized. At this point totalDemand should be >
>> 0 and you are allowed to call onNext
>>
>> Can you show the code of your publisher actor?
>>
>> Cheers,
>> Rafał
>>
>> W dniu środa, 9 marca 2016 13:48:27 UTC+1 użytkownik Simão Mata napisał:
>>>
>>> Hello,
>>>
>>> I think I misunderstand the usage of ActorPublisher. I read the
>>> documentation (
>>> http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/stream-integrations.html#ActorPublisher)
>>> but I cannot seem to understand how to use it.
>>>
>>> I am creating an ActorPublisher actor and creating a Source like this:
>>>
>>> val source = Source.actorPublisher(publisherProps)
>>>
>>> I then connect this source to a sink and run it:
>>> source.runWith(Sink.ignore). But debugging the actor I can see that
>>> totalDemand is always 0, so the actor never calls `onNext`. So what should
>>> update the demand on the actor? Do I always have to connect an
>>> ActorPublisher to an ActorSubscriber so that demand in ActorPublisher is
>>> updated properly?
>>>
>>> Thank you.
>>>
>> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Am I misunderstanding throttle?

2016-03-07 Thread Samuel Tardieu
If you throttle at a rate of 1 element every 2 seconds, you have likely
suffered the problem described in https://github.com/akka/akka/issues/19862
(summary: you can't throttle slower than 1 element per 1.074 second or a
division by zero will occur — and the first element will be delayed also).
Looking at the issue, it looks like a fix is being prepared and should be
available in Akka 2.4.3.

  Sam

Le ven. 4 mars 2016 à 06:50,  a écrit :

> Could this be a aka-streams bug? Should I open a git bug?
> In-between I am using my own  inferior throttle so I have a workaround
>
>
> final static class Tick {}
> public static  Flow createFlow(final long throttleSecs) {
> Flow sourceflow =  Flow.create();
>
> Flow, BoxedUnit> flow = Flow.fromGraph(
>  GraphDSL.create(builder -> {
> final FlowShape source = builder.add(sourceflow);
> Source tickSource = 
> Source.tick(FiniteDuration.apply(0, "millis"), 
> FiniteDuration.apply(throttleSecs, "millis"), new Tick());
> final FanInShape2> zipper = 
> builder.add(Zip.create());
> SourceShape tickSourceShape = builder.add(tickSource);
>
> builder.from(source).toInlet(zipper.in0());
> builder.from(tickSourceShape).toInlet(zipper.in1());
> return FlowShape.of(source.in(), zipper.out());
>  }));
>return flow.map(Pair::first);
>
>
> Am Mittwoch, 2. März 2016 15:12:00 UTC+1 schrieb john@gmail.com:
>>
>>
>> Hi Endre,
>> many thanks for wanting to help me!!
>>
>> If you run the code you will see that only "success : Pipeline 2"  gets
>> outputted to System.out.
>>
>> If I remove the Throttle everything works as expected.
>>
>> //Source pipeline = source.via(throttle).mapConcat(t -> 
>> t);
>> Source pipeline = source.mapConcat(t -> t);
>>
>>
>> Am I doing anything wrong or stupid?
>>
>> here is the code again:
>>
>> package  test
>>
>>
>>
>> import akka.actor.ActorSystem;
>> import akka.dispatch.Futures;
>> import akka.japi.function.Function;
>> import akka.stream.ActorMaterializer;
>> import akka.stream.ActorMaterializerSettings;
>> import akka.stream.Materializer;
>> import akka.stream.Supervision;
>> import akka.stream.javadsl.Flow;
>> import akka.stream.javadsl.Sink;
>> import akka.stream.javadsl.Source;
>> import scala.Option;
>> import scala.Tuple2;
>> import scala.concurrent.Promise;
>> import scala.concurrent.duration.FiniteDuration;
>> import scala.runtime.BoxedUnit;
>>
>> import java.util.ArrayList;
>> import java.util.Collection;
>>
>>
>> public class MailThrottleTest {
>>static ActorSystem system = ActorSystem.create("TestThrotteling");
>>
>>public static void main(String[] args) throws Exception{
>>   MailThrottleTest mailThrottleTest = new MailThrottleTest();
>>   Source  pipe1= 
>> mailThrottleTest.createPipeLine("Pipeline 1");
>>   Source  pipe2= 
>> mailThrottleTest.createPipeLine("Pipeline 2");
>>
>>
>>   final Function decider = exc -> {
>>  return Supervision.restart();
>>   };
>>   final Materializer mat1 = 
>> ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
>>  system);
>>   final Materializer mat2 = 
>> ActorMaterializer.create(ActorMaterializerSettings.create(system).withSupervisionStrategy(decider),
>>  system);
>>   pipe1.to(Sink.foreach(object -> {
>>  System.out.println("Got Object pipe 1 : "+object);
>>   })).run(mat1);
>>
>>   pipe2.to(Sink.foreach(object -> {
>>  System.out.println("Got Object pipe 2 : "+object);
>>   })).run(mat2);
>>   system.awaitTermination();
>>}
>>
>>public  Source  createPipeLine(final String name) 
>> throws Exception{
>>
>>
>>   final Promise>> promise = 
>> Futures.promise();
>>   Source source = 
>> Source.unfoldAsync(null, p -> {
>>  System.out.println("success : "+name);
>>  return promise.future();
>>   });
>>
>>   Flow throttle =
>> Flow.create().throttle(1, 
>> FiniteDuration.apply(2000, "millis"),1,
>>   new akka.stream.ThrottleMode.Shaping$());
>>
>>   Source pipeline = source.via(throttle).mapConcat(t 
>> -> t);
>>
>>   promise.success(Option.apply(new Tuple2(null, new ArrayList(;
>>   return pipeline;
>>
>>}
>>
>>
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Am Mittwoch, 2. März 2016 13:43:21 UTC+1 schrieb drewhk:
>>>
>>> Hi John,
>>>
>>> Can you prepare a small reproducer? It might be a bug, but we can only
>>> be sure if we see some code that exhibits the behavior.
>>>
>>> -Endre
>>>
>>> On Wed, Mar 2, 2016 at 1:40 PM, 

Re: [akka-user] Long-running HTTP requests not marking the TCP slot as busy

2015-05-27 Thread Samuel Tardieu
I will file an issue.

The problem would exist and even be amplified by long-polling GET requests,
as they would be seen as idempotent. Moreover, in Akka it looks like
disabling pipelining (setting it to 1) might not be enough if the
connection is seen as idle, at least it didn't work for me. Maybe we should
have a way of marking a request as potentially long running, which would
make the slot busy as long as the connection has not terminated, and
non-idempotent methods could set this flag by default.

2015-05-27 19:58 GMT+02:00 Roland Kuhn goo...@rkuhn.info:

 Hi Samuel,

 what you describe sounds like a bug, and I think I know how it arises as
 well. Would you please file an issue with this log and explanation? Thanks!

 Independently, it seems that you are pointing out an overall problematic
 relation between pipelining and long-running HTTP/1.1 requests: would this
 very same problem not exist for long-polling GET requests? If a user wants
 to schedule a long-running request of this kind, it seems to me that manual
 care would need to be taken to disable pipelining on the connection that is
 being used. How is this usually handled?

 Regards,

 Roland

 27 maj 2015 kl. 18:59 skrev Samuel Tardieu s...@rfc1149.net:

 Hi.

 Using Akka HTTP  Streams 1.0-RC3 in Scala, I have the impression that a
 connection can be used to pipeline another request even if a non-idempotent
 request is being executed (vs. a non-idempotent request being completed). I
 had the same problem in 1.0-RC2 but took no time to investigate.

 The context: I’m using an in-house library to access CouchDB databases
 over HTTP named “canape”. I create a database and wait for the result, I
 then open a long-running connection to the “_changes” stream which returns
 live database changes and send them to a “fold” sink in the background, and
 I immediately create 5 documents named “docid“ and wait for 100
 milliseconds after every document creation.

 In the example below, we have a single TCP stream dumped with wireshark.
 Note how the request to “_changes” (which represents a live stream of
 changes in CouchDB) returns a chunked response, and the chunked response is
 obviously not over when the PUT to create the second document (“docid2”)
 is sent over the same connection. Even though I took care of using POST
 to ensure that Akka does see the “_changes” request as non-idempotent.

 Note that the first document (“docid1”) has been created on another TCP
 connection (and thus not shown here), probably because it has been sent
 right after the request to the “_changes” stream, which means that at this
 time Akka probably considered the non-idempotent request to be running.
 However, the second document is being created on the busy original TCP
 connection, as if as soon as the headers were sent back the connection is
 considered idle again, although its entity is still being transmitted and
 may be for a long time. Since the PUT for “docid2” is still blocked on
 this connection, we can also see that “docid3”, “docid4” and “docid5” have
 been properly created using another TCP connection.

 Right now, I work around this by creating a new host connection pool for
 every long-running connection, but it is a waste or resources and causes
 some code duplication. The problem shown below happens when I try to use a
 common pool for all requests (which should work fine).

 Any idea of what might be wrong here?

  PUT /canape-test-db-d45ad182-e2ec-4387-b5a1-a51d016c0312/ HTTP/1.1
  Host: localhost:5984
  User-Agent: canape for Scala
  Accept: application/json
  Content-Length: 0
 
  HTTP/1.1 201 Created
  Server: CouchDB/1.6.1 (Erlang OTP/17)
  Location: 
 http://localhost:5984/canape-test-db-d45ad182-e2ec-4387-b5a1-a51d016c0312
  Date: Wed, 27 May 2015 16:43:24 GMT
  Content-Type: application/json
  Content-Length: 12
  Cache-Control: must-revalidate
 
  {ok:true}
  POST 
  /canape-test-db-d45ad182-e2ec-4387-b5a1-a51d016c0312/_changes?feed=continuous
   HTTP/1.1
  Host: localhost:5984
  User-Agent: canape for Scala
  Accept: application/json
  Content-Type: application/json
  Content-Length: 2
 
  {}
  HTTP/1.1 200 OK
  Transfer-Encoding: chunked
  Server: CouchDB/1.6.1 (Erlang OTP/17)
  Date: Wed, 27 May 2015 16:43:25 GMT
  Content-Type: application/json
  Cache-Control: must-revalidate
 
  51
  
 {seq:1,id:docid1,changes:[{rev:1-967a00dff5e02add41819138abb3284d}]}
 
  PUT /canape-test-db-d45ad182-e2ec-4387-b5a1-a51d016c0312/docid2 HTTP/1.1
  Host: localhost:5984
  User-Agent: canape for Scala
  Accept: application/json
  Content-Type: application/json
  Content-Length: 2
 
  {}
  51
  
 {seq:2,id:docid3,changes:[{rev:1-967a00dff5e02add41819138abb3284d}]}
 
  51
  
 {seq:3,id:docid4,changes:[{rev:1-967a00dff5e02add41819138abb3284d}]}
 
  51
  
 {seq:4,id:docid5,changes:[{rev:1-967a00dff5e02add41819138abb3284d}]}

 (hangs there, as the connection is still busy sending the long-running
 _changes stream, the creation of “docid2” will timeout

[akka-user] Long-running HTTP requests not marking the TCP slot as busy

2015-05-27 Thread Samuel Tardieu
Hi.

Using Akka HTTP  Streams 1.0-RC3 in Scala, I have the impression that a
connection can be used to pipeline another request even if a non-idempotent
request is being executed (vs. a non-idempotent request being completed). I
had the same problem in 1.0-RC2 but took no time to investigate.

The context: I’m using an in-house library to access CouchDB databases over
HTTP named “canape”. I create a database and wait for the result, I then
open a long-running connection to the “_changes” stream which returns live
database changes and send them to a “fold” sink in the background, and I
immediately create 5 documents named “docid“ and wait for 100 milliseconds
after every document creation.

In the example below, we have a single TCP stream dumped with wireshark.
Note how the request to “_changes” (which represents a live stream of
changes in CouchDB) returns a chunked response, and the chunked response is
obviously not over when the PUT to create the second document (“docid2”) is
sent over the same connection. Even though I took care of using POST to
ensure that Akka does see the “_changes” request as non-idempotent.

Note that the first document (“docid1”) has been created on another TCP
connection (and thus not shown here), probably because it has been sent
right after the request to the “_changes” stream, which means that at this
time Akka probably considered the non-idempotent request to be running.
However, the second document is being created on the busy original TCP
connection, as if as soon as the headers were sent back the connection is
considered idle again, although its entity is still being transmitted and
may be for a long time. Since the PUT for “docid2” is still blocked on this
connection, we can also see that “docid3”, “docid4” and “docid5” have been
properly created using another TCP connection.

Right now, I work around this by creating a new host connection pool for
every long-running connection, but it is a waste or resources and causes
some code duplication. The problem shown below happens when I try to use a
common pool for all requests (which should work fine).

Any idea of what might be wrong here?

 PUT /canape-test-db-d45ad182-e2ec-4387-b5a1-a51d016c0312/ HTTP/1.1
 Host: localhost:5984
 User-Agent: canape for Scala
 Accept: application/json
 Content-Length: 0

 HTTP/1.1 201 Created
 Server: CouchDB/1.6.1 (Erlang OTP/17)
 Location: 
http://localhost:5984/canape-test-db-d45ad182-e2ec-4387-b5a1-a51d016c0312
 Date: Wed, 27 May 2015 16:43:24 GMT
 Content-Type: application/json
 Content-Length: 12
 Cache-Control: must-revalidate

 {ok:true}
 POST 
 /canape-test-db-d45ad182-e2ec-4387-b5a1-a51d016c0312/_changes?feed=continuous 
 HTTP/1.1
 Host: localhost:5984
 User-Agent: canape for Scala
 Accept: application/json
 Content-Type: application/json
 Content-Length: 2

 {}
 HTTP/1.1 200 OK
 Transfer-Encoding: chunked
 Server: CouchDB/1.6.1 (Erlang OTP/17)
 Date: Wed, 27 May 2015 16:43:25 GMT
 Content-Type: application/json
 Cache-Control: must-revalidate

 51
 
{seq:1,id:docid1,changes:[{rev:1-967a00dff5e02add41819138abb3284d}]}

 PUT /canape-test-db-d45ad182-e2ec-4387-b5a1-a51d016c0312/docid2 HTTP/1.1
 Host: localhost:5984
 User-Agent: canape for Scala
 Accept: application/json
 Content-Type: application/json
 Content-Length: 2

 {}
 51
 
{seq:2,id:docid3,changes:[{rev:1-967a00dff5e02add41819138abb3284d}]}

 51
 
{seq:3,id:docid4,changes:[{rev:1-967a00dff5e02add41819138abb3284d}]}

 51
 
{seq:4,id:docid5,changes:[{rev:1-967a00dff5e02add41819138abb3284d}]}

(hangs there, as the connection is still busy sending the long-running
_changes stream, the creation of “docid2” will timeout)
​

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka HTTP (Scala) 1.0-RC[12]: Processor actor terminated abruptly

2015-04-30 Thread Samuel Tardieu
Hi.

With akka http 1.0-RC2 (was similar in 1.0-RC1), one of my program signals
an intermittent error that I do not understand.

The context: one HTTP GET request is sent out, the JSON response is
properly received, decoded and acted upon, and then the system terminates
with (system being my ActorSystem):

Await.ready(Http().shutdownAllConnectionPools(), 5.seconds)
system.shutdown()

The error only happens from time to time (I’d say once every three runs). I
have activated akka.actor.debug.{receive,autoreceive,lifecycle,fsm}.

Any idea of what could be the cause?

INFO] [04/30/2015 22:36:01.916] [run-main-0] [ActorSystem(default)]
Initiating orderly shutdown of all active host connections pools...
[DEBUG] [04/30/2015 22:36:01.921]
[default-akka.actor.default-dispatcher-8]
[akka://default/system/deadLetterListener] stopped
[DEBUG] [04/30/2015 22:36:01.922]
[default-akka.actor.default-dispatcher-5]
[akka://default/user/$a/flow-2-3-publisherSource-processor-mapConcat]
stopped
[DEBUG] [04/30/2015 22:36:01.922]
[default-akka.actor.default-dispatcher-6]
[akka://default/user/$a/flow-2-9-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.retryMerge-flexiMerge]
stopped
[DEBUG] [04/30/2015 22:36:01.922]
[default-akka.actor.default-dispatcher-5]
[akka://default/user/$a/flow-2-2-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute]
stopped
[DEBUG] [04/30/2015 22:36:01.923]
[default-akka.actor.default-dispatcher-12]
[akka://default/user/$a/flow-2-4-publisherSource-Merge] stopped
[DEBUG] [04/30/2015 22:36:01.923]
[default-akka.actor.default-dispatcher-14]
[akka://default/user/$a/flow-2-1-publisherSource-Merge] stopped
[DEBUG] [04/30/2015 22:36:01.923]
[default-akka.actor.default-dispatcher-10]
[akka://default/user/$a/flow-2-11-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.RetrySplit-flexiRoute]
stopped
[DEBUG] [04/30/2015 22:36:01.923]
[default-akka.actor.default-dispatcher-12]
[akka://default/user/$a/flow-2-12-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute]
stopped
[DEBUG] [04/30/2015 22:36:01.923]
[default-akka.actor.default-dispatcher-14]
[akka://default/user/$a/flow-2-14-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute]
stopped
[DEBUG] [04/30/2015 22:36:01.923]
[default-akka.actor.default-dispatcher-10]
[akka://default/user/$a/flow-2-5-publisherSource-processor-PoolSlot.SlotEventSplit-flexiRoute]
stopped
[DEBUG] [04/30/2015 22:36:01.924]
[default-akka.actor.default-dispatcher-12]
[akka://default/user/$a/flow-2-13-publisherSource-processor-mapConcat]
stopped
[DEBUG] [04/30/2015 22:36:01.924]
[default-akka.actor.default-dispatcher-14]
[akka://default/user/$a/flow-2-15-publisherSource-processor-mapConcat]
stopped
[DEBUG] [04/30/2015 22:36:01.924]
[default-akka.actor.default-dispatcher-10]
[akka://default/user/$a/flow-2-6-publisherSource-processor-mapConcat]
stopped
[DEBUG] [04/30/2015 22:36:01.924]
[default-akka.actor.default-dispatcher-3] [akka://default/user]
stopping
[DEBUG] [04/30/2015 22:36:01.924]
[default-akka.actor.default-dispatcher-4]
[akka://default/user/SlotProcessor-0] stopped
[DEBUG] [04/30/2015 22:36:01.924]
[default-akka.actor.default-dispatcher-14]
[akka://default/user/SlotProcessor-3] stopped
[DEBUG] [04/30/2015 22:36:01.926]
[default-akka.actor.default-dispatcher-10]
[akka://default/user/SlotProcessor-1] stopped
[DEBUG] [04/30/2015 22:36:01.926]
[default-akka.actor.default-dispatcher-12]
[akka://default/user/SlotProcessor-2] stopped
[DEBUG] [04/30/2015 22:36:01.926]
[default-akka.actor.default-dispatcher-12]
[akka://default/user/$a/flow-3-16-actorPublisherSource-actorSubscriberSink]
stopped
[DEBUG] [04/30/2015 22:36:01.926]
[default-akka.actor.default-dispatcher-13]
[akka://default/user/$a/flow-2-8-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.Route-flexiRoute]
stopped
[DEBUG] [04/30/2015 22:36:01.926]
[default-akka.actor.default-dispatcher-11]
[akka://default/user/$a/flow-2-7-publisherSource-PoolConductor.retryMerge-flexiMerge-mapConcat]
stopped
[DEBUG] [04/30/2015 22:36:01.928]
[default-akka.actor.default-dispatcher-8]
[akka://default/user/PoolInterfaceActor-0] stopped
[DEBUG] [04/30/2015 22:36:01.928]
[default-akka.actor.default-dispatcher-9]
[akka://default/user/$a/flow-2-10-publisherSource-PoolConductor.retryMerge-flexiMerge-PoolConductor.SlotSelector-flexiMerge]
stopped
[DEBUG] [04/30/2015 22:36:01.928]
[default-akka.actor.default-dispatcher-14]
[akka://default/user/$a/flow-3-0-actorPublisherSource-actorPublisherSource]
stopped
[DEBUG] [04/30/2015 22:36:01.928]
[default-akka.actor.default-dispatcher-9]
[akka://default/user/$a/flow-3-1-actorPublisherSource-Broadcast-map]
stopped
[DEBUG] [04/30/2015 22:36:01.928]
[default-akka.actor.default-dispatcher-8]
[akka://default/user/$a/flow-3-10-actorPublisherSource-Broadcast-stageFactory-stageFactory-splitWhen]
stopped
[DEBUG] [04/30/2015 22:36:01.929]
[default-akka.actor.default-dispatcher-5]