Hi Roland, thank you for your detailed answer. In my timestamp example timestamps are processed while new timestamps arrive and at this point processing of old timestamps is obsolete but they are somewhere in the processing chain. Of course I can discard them at the end when dealing with timestamps (comparing with the current time) and perhaps (have to think about that) even when not dealing with timestamps but rather with some other inputstream with new incoming elements invalidating the processing of not fully processed elements before they are stucked in the buffers of some stage(s) of my partial flowgraph. A synchronous processing of my partial(!) flowgraph avoids having elements that are obsolete AND not fully processed. I this manner only required elements are processed and CPU load is lower and more important a more realtime outcome. For example, say you have a stream of stock prices and you do some processing in a partial flowgraph and finally you show the result as some remolded "value" on a screen. If stockprices are faster generated than your processing yielding a screen "value" can handle you won't see real time prices anymore and if the prices stream pauses emitting at stock exchange closing your screen is still refreshing until all involved intermediate buffers are drained. That won't happen with a certain buffer strategy (DropHead) for the stock prices stream and synchronous processing for the following partial flowgraph. So my program is not broken but it is not realtime and heats up my CPU more than necessary. Delay is inevitable because processing takes time but unnecessary delay of having obsolete elements in internal buffers is evitable.
I'll act on your advice and will have a look at OneBoundedInterpreted and mapConcat. Looking forward to operator fusion. Peter Am Montag, 30. März 2015 12:48:12 UTC+2 schrieb rkuhn: > > Hi Peter, > > thanks for this explanation, but it is not yet clear to me how exactly > your program would be broken with bufferSize=1 (implementation-wise). What > exactly is the invariant that is violated, and how would that hold with a > hypothetical bufferSize=0? > > If all you are interested in is synchronous operation of a chain of > Stages, then you might look into manually operating a OneBoundedInterpreter > in a mapConcat stage—while operator fusion is on the horizon (but out of > scope for Streams 1.0) it will not strictly guarantee buffer-freedom > because it will be a best-effort optimization. > > OTOH if I imagine a system that takes in some stream of elements and does > processing based on timestamps then why would it not be sufficient to > timestamp things when they enter and discard them along the way? Why does > the processor need to be buffer-free? In particular: the processing will > time, too, so what differentiates the delay introduced by buffering from > the processing latency? > > Regards, > > Roland > > 30 mar 2015 kl. 12:22 skrev Peter Schmitz <[email protected] > <javascript:>>: > > Always assumed import FlowGraph.Implicits._ and all other necessary > imports: > I can pack all my logic into a function: > > val f: FlowGraph.Builder => Outlet[T] => Outlet[S] > > for some custom type T and type S. Now I can wrap/use this function as a > Flow or PartialFlowGraph: > > val flow: Flow[Input, Trigger.type, Unit] = Flow() { implicit b => > val broadcast = b.add(Broadcast[Input](1)) > (broadcast.in, f(b)(broadcast.out(0))) > } > > or > > val partialFlowGraph = FlowGraph.partial() { implicit b => > val broadcast = b.add(Broadcast[Input](1)) > UniformFanInShape(f(b)(broadcast.out(0)), broadcast.in) > } > > Now in my main flowgraph: > > FlowGraph.closed() { implicit b => > val in = Source.empty[T] > val out = Sink.ignore > > in ~> flow ~> out > // or : > val fg = b.add(partialFlowGraph) > in ~> fg.in(0) > fg.out ~> out > } > > In reality the above flowgraph is more complex but for this showcase a > dummy in and out is used, e.g. actually "in" is a PropsSource[T] externally > feeded by an actor. > I want flow or partialFlowGraph to have zero buffers internally but at the > outside as usual and thus behave as a single Flow[T].map(f). > So to answer your first question: "should act like a single stream stage" > from the perspective of the main flowgraph. > And my problem (second question) is that I have a PropsSource[T] feeding > my main flowgraph which buffers external incomming events with a custom > bufferlogic. > A example will make that clear: > Lets assume for the moment this events are timestamps every 30ms and the > processing of these timestamps by my function f respectively > flow/partialFlowGraph takes more than 30ms (time for all internal stages > together for one element) > Then the buffers of all involved stages in main and especially in my > partialFlowGraph get filled up over time. > I like to discard old timestamps in my PropsSource-Bufferlogic in that > case. But the old timestamps are stucked in the processing chain of my > partial flowgraph. > So in.buffer(1, OverflowStrategy.dropHead) won't help here. > Hope this simplified example shows my need for something like FuseStage. > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] <javascript:>. > To post to this group, send email to [email protected] > <javascript:>. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > > > > > *Dr. Roland Kuhn* > *Akka Tech Lead* > Typesafe <http://typesafe.com/> – Reactive apps on the JVM. > twitter: @rolandkuhn > <http://twitter.com/#!/rolandkuhn> > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
