[akka-user] Error handling with Source.queue BroadcastHub

2017-10-21 Thread subopt1
Hi, I'm using a Source.queue with BroadcastHub to implement a pattern where a web request can add an item to the queue, attach to the graph and get a result. The problem I'm stuck on is that I'm not sure how to handle errors without failing the graph. Example: Without the supervision strategy

[akka-user] How to get materialized values from a custom Sink

2017-02-08 Thread subopt1
Hi, This might be a silly question but I could not find an answer in the docs. I have a custom Sink (GraphStageLogic[SinkShape[T]]) which publishes to an event bus and runs for a predetermined duration. This works however there is no materialized value other than NotUsed since SinkShape only s

[akka-user] Re: akka streams graph is not fully drained when Future[Done] completes

2017-01-20 Thread subopt1
To clarify, this is not a bug in akka. The issue is there are inflight futures that must be accounted for. To summarize you need to keep track of your inflight operations (futures) and override onUpstreamFinish in the InHandler, for example: override def onUpstreamFinish(): Unit = { if (infli

[akka-user] Chunked Response handling with Akka Http?

2017-01-16 Thread subopt1
I'd like to be able to use akka-http with chunked responses. I'd like to know if I can do the same thing in akka-http as with the playframework. The following is from playframework documentation. def index = Action { val source = Source.apply(List("kiki", "foo", "bar")) Ok.chunked(source)

Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread subopt1
Not exactly sure what you were looking for. I'm relatively new to akka streams. This is the minimal amount of code that reproduces the problem, involving: a custom source an flow with AsyncCallback and parallelizing only deps are akka and joda time On Friday, December 16, 2016 at 3:31:37 PM UT

Re: [akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread subopt1
Here's the code to reproduce. The issue only seems to occur with my custom Source and using a callback in a Flow and while parallelizing. If this code is run with .via(parallelizeFlow(parallelize = 1, asyncFlow)) it drops the last minute, but when run with .via(asyncFlow) it does not. So with

[akka-user] akka streams graph is not fully drained when Future[Done] completes

2016-12-16 Thread subopt1
Hi, I'm seeing an issue where the graph completes while there is still data in one of the flows. The last element emitted by the source enters a custom GraphStageLogic flow, where it is sent to a function that returns a Future. That Future has a callback which invokes getAsyncCallback and then

[akka-user] Optimizing akka streams with parallelize

2016-12-15 Thread subopt1
Hi, I've been looking for some guidance on optimizing akka streams for throughput but have not found much info so far. For example, if I have a non-blocking flow, would parallelizing by number of cpu cores make sense? So far what I've observed after several runs is a decrease in processing tim

[akka-user] Varying result when running akka flow in parellel

2016-12-14 Thread subopt1
HI, I'm working with a flow that downloads data, parses json and adds ids to a set (dedupe). It's working just fine however when I modify the flow to run in parallel, I get different results. Here's my graph: val graph: RunnableGraph[Future[HashSet[Long]]] = Source.fromGraph(new MinuteSource

Re: [akka-user] How to deal with Futures in Sources/Flows

2016-12-09 Thread subopt1
I thought I needed to use a GraphStageLogic because I need to do the following: request url for page of data and push if response has next token, grab another page and push continue until no next token complete the stream I wasn't sure how to accomplish that logic in a Source. I figured out a s

[akka-user] Re: How to deal with Futures in Sources/Flows

2016-12-08 Thread subopt1
Well that didn't work at all. onPull gets called continuously before any Futures have been resolved. At this point the only way I know to make it work is to Await the future however then I would be blocking the stream. Any ideas? On Thursday, December 8, 2016 at 1:21:08 PM UTC-7, sub...@gmail.c

[akka-user] How to deal with Futures in Sources/Flows

2016-12-08 Thread subopt1
Hi, I'm creating a Source via GraphStageLogic which makes calls to another api, which happens to return a Source. However I'm unsure how to deal with Source/Futures in a GraphStageLogic. It seems that I want my shape to look like val shape: SourceShape[Seq[String]] = SourceShape(out) but I g

Re: [akka-user] Akka Stream stalling with JsonFraming

2016-12-06 Thread subopt1
I figured it out. Submitting a pull request! On Tuesday, December 6, 2016 at 12:29:28 PM UTC-7, Konrad Malawski wrote: > > Is the JSON well formed and "normal" or something weird or maybe huge > objects or something in there etc? > Try to debug at which point it gets stuck. > > A minimized reprod

[akka-user] Akka Stream stalling with JsonFraming

2016-12-06 Thread subopt1
I working on a Akka Streams project that reads gzipped files from S3 and parses json. The issue I'm running into the stream stalls at about 24523530 bytes and then times-out after a 1 minute (java.util.concurrent.TimeoutException: No elements passed in the last 1 minute), but there is no error

[akka-user] Re: http proxy support

2016-11-02 Thread subopt1
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0/scala/http/configuration.html also in the 2.4.11 docs but can't find that link at the moment prior to the client section, under host-connection-pool it says 1. # Please note that this section mirrors `akka.http.client` however is

[akka-user] http proxy support

2016-10-31 Thread subopt1
I'm interested in proxying http requests with akka-http. Based on what I've read so far I need to enable the configuration and then it should work with Http().singleRequest however it's not connecting through the proxy. I have the following config in application.conf akka.http { host-connectio