[akka-user] Re: akka 2.4.10 - connection reset by peer when consuming stream with parallelism == 1
Yes, Konrad. You nailed it. I can get it to work with parallelism set to 1 and sleep interval tuned down from 50 to 25. This is something I cannot work around from the client side, right? This idle timeout is something set on the server side. I am worried that back pressure is causing my http connection to become idle. On Friday, September 30, 2016 at 12:00:09 PM UTC-3, Eric Torti wrote: > > Hey guys, > > I am new to akka streams so I may be missing something big here. Using > akka-stream, akka-http-core, akka-typed-experimental all 2.4.10 - scala > 2.11.8. > > I am experiencing the `connection reset by peer` error on a stream that I > have checked with `curl` to remain open far longer than the time it takes > akka to crash. > > The stream is produced from json lines like so: > > ``` > > def stream(): Source[Development, Any] = { > > def chunkConsumer(res: HttpResponse) = { > res.entity.dataBytes > .via(Framing.delimiter(ByteString("\r\n"), 1048576, allowTruncation = > true)) > .map[Development](bs => { parse(bs.utf8String).extract[Development] }) > } > > val req = HttpRequest(method = HttpMethods.GET, uri = > s"http://$endpoint:$port/developments/stream";) > val res: Source[Development, Any] = > Source.single(req).via(client).flatMapConcat(chunkConsumer) > > res > } > > ``` > > And consumed like so: > > ``` > > def slowAsyncTransformation(d: Development) = { > Future { Thread sleep 50; d } > } > > val parallelism = 2 > > DevelopmentsAPI > .stream() > .mapAsync(parallelism)(slowAsyncTransformation(_)) > .runFold(0)((acc, d) => { > System.out.println("processing development " + d.id) > acc + 1 > }) > .onComplete { > case Success(s) => { > System.out.println(s"processed $s developments") > system.terminate() > Await.result(system.whenTerminated, 5 seconds) > System.exit(0) > } > case Failure(ex) => { > System.err.println(s"Could not finish processing developments: $ex") > system.terminate() > Await.result(system.whenTerminated, 5 seconds) > System.exit(1) > } > } > > ``` > > > I can consistently get this to work from end to end in 180 seconds if I have > `parallelism` set to 2. > > > But if I set `parallelism` to 1, it crashes in 35 seconds with `Could not > finish processing developments: akka.stream.StreamTcpException: The > connection closed with error: Connection reset by peer` > > I cannot make sense of why is the degree of parallelism being 1 would > cause the peer to reset the connection. > > Any help will be greatly appreciated. > > Thanks, > > Eric > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka 2.4.10 - connection reset by peer when consuming stream with parallelism == 1
Thank you for your answer, Konrad. I am guessing this shouldn't be the issue, because I have consumed the same stream very slowly using curl --limit-rate 75k. And it took me over 5 minutes to go from end to end. And it worked ell. But I am not sure if akka http handles the socket the same way as curl does. So you might be right. I will try to emulate that setting parallelism to 1 and tuning the sleep interval. On Friday, September 30, 2016 at 12:00:53 PM UTC-3, Konrad Malawski wrote: > > That means the other side has closed the connection. It could be an idle > timeout. Are you sure data is consistently flowing, and not being idle for > minutes? > > On 30 Sep 2016 16:59, "Eric Torti" > > wrote: > >> Hey guys, >> >> I am new to akka streams so I may be missing something big here. Using >> akka-stream, akka-http-core, akka-typed-experimental all 2.4.10 - scala >> 2.11.8. >> >> I am experiencing the `connection reset by peer` error on a stream that I >> have checked with `curl` to remain open far longer than the time it takes >> akka to crash. >> >> The stream is produced from json lines like so: >> >> ``` >> >> def stream(): Source[Development, Any] = { >> >> def chunkConsumer(res: HttpResponse) = { >> res.entity.dataBytes >> .via(Framing.delimiter(ByteString("\r\n"), 1048576, allowTruncation = >> true)) >> .map[Development](bs => { parse(bs.utf8String).extract[Development] }) >> } >> >> val req = HttpRequest(method = HttpMethods.GET, uri = >> s"http://$endpoint:$port/developments/stream";) >> val res: Source[Development, Any] = >> Source.single(req).via(client).flatMapConcat(chunkConsumer) >> >> res >> } >> >> ``` >> >> And consumed like so: >> >> ``` >> >> def slowAsyncTransformation(d: Development) = { >> Future { Thread sleep 50; d } >> } >> >> val parallelism = 2 >> >> DevelopmentsAPI >> .stream() >> .mapAsync(parallelism)(slowAsyncTransformation(_)) >> .runFold(0)((acc, d) => { >> System.out.println("processing development " + d.id) >> acc + 1 >> }) >> .onComplete { >> case Success(s) => { >> System.out.println(s"processed $s developments") >> system.terminate() >> Await.result(system.whenTerminated, 5 seconds) >> System.exit(0) >> } >> case Failure(ex) => { >> System.err.println(s"Could not finish processing developments: $ex") >> system.terminate() >> Await.result(system.whenTerminated, 5 seconds) >> System.exit(1) >> } >> } >> >> ``` >> >> >> I can consistently get this to work from end to end in 180 seconds if I have >> `parallelism` set to 2. >> >> >> But if I set `parallelism` to 1, it crashes in 35 seconds with `Could not >> finish processing developments: akka.stream.StreamTcpException: The >> connection closed with error: Connection reset by peer` >> >> I cannot make sense of why is the degree of parallelism being 1 would >> cause the peer to reset the connection. >> >> Any help will be greatly appreciated. >> >> Thanks, >> >> Eric >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to akka-user+...@googlegroups.com . >> To post to this group, send email to akka...@googlegroups.com >> . >> Visit this group at https://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] akka 2.4.10 - connection reset by peer when consuming stream with parallelism == 1
Hey guys, I am new to akka streams so I may be missing something big here. Using akka-stream, akka-http-core, akka-typed-experimental all 2.4.10 - scala 2.11.8. I am experiencing the `connection reset by peer` error on a stream that I have checked with `curl` to remain open far longer than the time it takes akka to crash. The stream is produced from json lines like so: ``` def stream(): Source[Development, Any] = { def chunkConsumer(res: HttpResponse) = { res.entity.dataBytes .via(Framing.delimiter(ByteString("\r\n"), 1048576, allowTruncation = true)) .map[Development](bs => { parse(bs.utf8String).extract[Development] }) } val req = HttpRequest(method = HttpMethods.GET, uri = s"http://$endpoint:$port/developments/stream";) val res: Source[Development, Any] = Source.single(req).via(client).flatMapConcat(chunkConsumer) res } ``` And consumed like so: ``` def slowAsyncTransformation(d: Development) = { Future { Thread sleep 50; d } } val parallelism = 2 DevelopmentsAPI .stream() .mapAsync(parallelism)(slowAsyncTransformation(_)) .runFold(0)((acc, d) => { System.out.println("processing development " + d.id) acc + 1 }) .onComplete { case Success(s) => { System.out.println(s"processed $s developments") system.terminate() Await.result(system.whenTerminated, 5 seconds) System.exit(0) } case Failure(ex) => { System.err.println(s"Could not finish processing developments: $ex") system.terminate() Await.result(system.whenTerminated, 5 seconds) System.exit(1) } } ``` I can consistently get this to work from end to end in 180 seconds if I have `parallelism` set to 2. But if I set `parallelism` to 1, it crashes in 35 seconds with `Could not finish processing developments: akka.stream.StreamTcpException: The connection closed with error: Connection reset by peer` I cannot make sense of why is the degree of parallelism being 1 would cause the peer to reset the connection. Any help will be greatly appreciated. Thanks, Eric -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.