Hi,
I'm using a Source.queue with BroadcastHub to implement a pattern where a
web request can add an item to the queue, attach to the graph and get a
result. The problem I'm stuck on is that I'm not sure how to handle errors
without failing the graph. Example:
Without the supervision strategy
Hi,
This might be a silly question but I could not find an answer in the docs.
I have a custom Sink (GraphStageLogic[SinkShape[T]]) which publishes to an
event bus and runs for a predetermined duration. This works however there
is no materialized value other than NotUsed since SinkShape only s
To clarify, this is not a bug in akka. The issue is there are inflight
futures that must be accounted for. To summarize you need to keep track of
your inflight operations (futures) and override onUpstreamFinish in the
InHandler, for example:
override def onUpstreamFinish(): Unit = {
if (infli
I'd like to be able to use akka-http with chunked responses. I'd like to know
if I can do the same thing in akka-http as with the playframework. The
following is from playframework documentation.
def index = Action {
val source = Source.apply(List("kiki", "foo", "bar"))
Ok.chunked(source)
Not exactly sure what you were looking for. I'm relatively new to akka
streams. This is the minimal amount of code that reproduces the problem,
involving:
a custom source
an flow with AsyncCallback
and parallelizing
only deps are akka and joda time
On Friday, December 16, 2016 at 3:31:37 PM UT
Here's the code to reproduce. The issue only seems to occur with my custom
Source and using a callback in a Flow and while parallelizing. If this code
is run with .via(parallelizeFlow(parallelize = 1, asyncFlow)) it drops the
last minute, but when run with .via(asyncFlow) it does not.
So with
Hi,
I'm seeing an issue where the graph completes while there is still data in
one of the flows. The last element emitted by the source enters a custom
GraphStageLogic flow, where it is sent to a function that returns a Future.
That Future has a callback which invokes getAsyncCallback and then
Hi,
I've been looking for some guidance on optimizing akka streams for
throughput but have not found much info so far. For example, if I have a
non-blocking flow, would parallelizing by number of cpu cores make sense?
So far what I've observed after several runs is a decrease in processing
tim
HI,
I'm working with a flow that downloads data, parses json and adds ids to a
set (dedupe). It's working just fine however when I modify the flow to run
in parallel, I get different results.
Here's my graph:
val graph: RunnableGraph[Future[HashSet[Long]]] =
Source.fromGraph(new MinuteSource
I thought I needed to use a GraphStageLogic because I need to do the
following:
request url for page of data and push
if response has next token, grab another page and push
continue until no next token
complete the stream
I wasn't sure how to accomplish that logic in a Source.
I figured out a s
Well that didn't work at all. onPull gets called continuously before any
Futures have been resolved. At this point the only way I know to make it
work is to Await the future however then I would be blocking the stream.
Any ideas?
On Thursday, December 8, 2016 at 1:21:08 PM UTC-7, sub...@gmail.c
Hi,
I'm creating a Source via GraphStageLogic which makes calls to another api,
which happens to return a Source. However I'm unsure how to deal with
Source/Futures in a GraphStageLogic. It seems that I want my shape to look
like
val shape: SourceShape[Seq[String]] = SourceShape(out)
but I g
I figured it out. Submitting a pull request!
On Tuesday, December 6, 2016 at 12:29:28 PM UTC-7, Konrad Malawski wrote:
>
> Is the JSON well formed and "normal" or something weird or maybe huge
> objects or something in there etc?
> Try to debug at which point it gets stuck.
>
> A minimized reprod
I working on a Akka Streams project that reads gzipped files from S3 and
parses json. The issue I'm running into the stream stalls at about 24523530
bytes and then times-out after a 1 minute
(java.util.concurrent.TimeoutException: No elements passed in the last 1
minute), but there is no error
http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0/scala/http/configuration.html
also in the 2.4.11 docs but can't find that link at the moment
prior to the client section, under host-connection-pool it says
1. # Please note that this section mirrors `akka.http.client` however is
I'm interested in proxying http requests with akka-http. Based on what I've
read so far I need to enable the configuration and then it should work with
Http().singleRequest
however it's not connecting through the proxy. I have the following config
in application.conf
akka.http {
host-connectio
16 matches
Mail list logo