Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Rafał Krzewski
Since we have √'s attention, allow me to repeat my original question: > The question now remains, how do I fabricate a Graph[FlowShape[T, T], >> Cancellable] that will generate an instance of Cancellable on each >> materialization connected to a PushPullStage, in such way that cancel() >> wou

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Viktor Klang
Requesting one element at a time would lead to low-to-no concurrency and high overhead per element. Check input buffer size in Attributes. -- Cheers, √ On 26 Jul 2015 17:50, "David Pinn" wrote: > I think this could happen if a stage immediately requests the next element > once it starts working

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
I think this could happen if a stage immediately requests the next element once it starts working on a received element. Is that the case? On Sunday, 26 July 2015 23:39:00 UTC+10, David Pinn wrote: > > b) the stream processes one more element than I expect it to. > > The source under test spits o

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
I've created a test that exercises a composite source that has an internal cycle. It processes integers, starting at 1 and doubling it until it is cancelled. The code can be viewed as a gist . Two things to note: a) the stream completes succes

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
Yes, that's what I'm doing. More precisely, I'm defining a composite Source like so: Initial ~> Merge ~> Akka HTTP ~> Broadcast ~> Consumer +~> ~>+ | | +~~~ Throttler <~~+ The Th

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Rafał Krzewski
David, does your flow contain an actual internal feedback loop? What I understand from your description is that you have something like this: { tick source ~> mapAsync calling external service ~> } ~> consumer And your return the Cancellable provided by the tick source as materialized value as

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
The JavaDoc for the Source.from(...) method says this: "Elements are emitted periodically with the specified interval. The tick element will be delivered to downstream consumers that has requested any elements. If a consumer has not requested any elements at the point in time when the tick elem

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread Viktor Klang
There is no stream cancellation, there is only erronous or normal completion. What does the java/scaladoc say? -- Cheers, √ On 26 Jul 2015 09:17, "David Pinn" wrote: > No, but I might try to put one together. When the tick source is > cancelled, should that cause cancellation of the stream, or

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-26 Thread David Pinn
No, but I might try to put one together. When the tick source is cancelled, should that cause cancellation of the stream, or completion of the stream? -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-25 Thread Viktor Klang
Do you happen to have a minimized failing test case for that? On Sat, Jul 25, 2015 at 6:36 PM, David Pinn wrote: > This is pretty much exactly what I'm trying to do. I'm polling an external > system every 20 seconds. I use a tick source to control the timing, zipping > the ticks with the equival

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-07-25 Thread David Pinn
This is pretty much exactly what I'm trying to do. I'm polling an external system every 20 seconds. I use a tick source to control the timing, zipping the ticks with the equivalent of your WatchRequest. The tick source materializes to a Cancellable, so that's nice. Tragically, cancelling the Ca

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-04-24 Thread Rafał Krzewski
W dniu piątek, 24 kwietnia 2015 14:24:35 UTC+2 użytkownik drewhk napisał: > > >> The question now remains, how do I fabricate a Graph[FlowShape[T, T], >> Cancellable] that will generate an instance of Cancellable on each >> materialization connected to a PushPullStage, in such way that cancel()

Re: [akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-04-24 Thread Endre Varga
Hi Rafal On Fri, Apr 24, 2015 at 2:21 PM, Rafał Krzewski wrote: > I think I've moved one step closer: I think I know how to weld a flow > breaker into my graph: > > def watch(key: String, waitIndex: Option[Int] = None, recursive: > Option[Boolean] = None, quorum: Option[Boolean] = None): > Sou

[akka-user] Re: streams - shutting down a flow / exposing a materialized value

2015-04-24 Thread Rafał Krzewski
I think I've moved one step closer: I think I know how to weld a flow breaker into my graph: def watch(key: String, waitIndex: Option[Int] = None, recursive: Option[Boolean] = None, quorum: Option[Boolean] = None): Source[EtcdResponse, Cancellable] = { case class WatchRequest(key: String,