Hi Roland,

thank you for your detailed answer.
In my timestamp example timestamps are processed while new timestamps 
arrive and at this point processing of old timestamps is obsolete but they 
are somewhere in the processing chain. Of course I can discard them at the 
end when dealing with timestamps (comparing with the current time) and 
perhaps (have to think about that) even when not dealing with timestamps 
but rather with some other inputstream with new incoming elements 
invalidating the processing of not fully processed elements before they are 
stucked in the buffers of some stage(s) of my partial flowgraph.
A synchronous processing of my partial(!) flowgraph avoids having elements 
that are obsolete AND not fully processed. I this manner only required 
elements are processed and CPU load is lower and more important a more 
realtime outcome.
For example, say you have a stream of stock prices and you do some 
processing in a partial flowgraph and finally you show the result as some 
remolded "value" on a screen. If stockprices are faster generated than your 
processing yielding a screen "value" can handle you won't see real time 
prices anymore and if the prices stream pauses emitting at stock exchange 
closing your screen is still refreshing until all involved intermediate 
buffers are drained.
That won't happen with a certain buffer strategy (DropHead) for the stock 
prices stream and synchronous processing for the following partial 
flowgraph.
So my program is not broken but it is not realtime and heats up my CPU more 
than necessary.
Delay is inevitable because processing takes time but unnecessary delay of 
having obsolete elements in internal buffers is evitable.

I'll act on your advice and will have a look at OneBoundedInterpreted and 
mapConcat.

Looking forward to operator fusion.

Peter



Am Montag, 30. März 2015 12:48:12 UTC+2 schrieb rkuhn:
>
> Hi Peter,
>
> thanks for this explanation, but it is not yet clear to me how exactly 
> your program would be broken with bufferSize=1 (implementation-wise). What 
> exactly is the invariant that is violated, and how would that hold with a 
> hypothetical bufferSize=0?
>
> If all you are interested in is synchronous operation of a chain of 
> Stages, then you might look into manually operating a OneBoundedInterpreter 
> in a mapConcat stage—while operator fusion is on the horizon (but out of 
> scope for Streams 1.0) it will not strictly guarantee buffer-freedom 
> because it will be a best-effort optimization.
>
> OTOH if I imagine a system that takes in some stream of elements and does 
> processing based on timestamps then why would it not be sufficient to 
> timestamp things when they enter and discard them along the way? Why does 
> the processor need to be buffer-free? In particular: the processing will 
> time, too, so what differentiates the delay introduced by buffering from 
> the processing latency?
>
> Regards,
>
> Roland
>
> 30 mar 2015 kl. 12:22 skrev Peter Schmitz <[email protected] 
> <javascript:>>:
>
> Always assumed import FlowGraph.Implicits._ and all other necessary 
> imports:
> I can pack all my logic into a function:
>
> val f: FlowGraph.Builder => Outlet[T] => Outlet[S]
>
> for some custom type T and type S. Now I can wrap/use this function as a 
> Flow or PartialFlowGraph:
>
> val flow: Flow[Input, Trigger.type, Unit] = Flow() { implicit b =>
>   val broadcast = b.add(Broadcast[Input](1))
>   (broadcast.in, f(b)(broadcast.out(0)))
> }
>
> or
>
> val partialFlowGraph = FlowGraph.partial() { implicit b =>
>   val broadcast = b.add(Broadcast[Input](1))
>   UniformFanInShape(f(b)(broadcast.out(0)), broadcast.in)
> }
>
> Now in my main flowgraph:
>
> FlowGraph.closed() { implicit b =>
>   val in = Source.empty[T]
>   val out = Sink.ignore
>
>   in ~> flow ~> out
>   // or : 
>   val fg = b.add(partialFlowGraph)
>   in ~> fg.in(0)
>   fg.out ~> out
> }
>
> In reality the above flowgraph is more complex but for this showcase a 
> dummy in and out is used, e.g. actually "in" is a PropsSource[T] externally 
> feeded by an actor.
> I want flow or partialFlowGraph to have zero buffers internally but at the 
> outside as usual and thus behave as a single Flow[T].map(f).
> So to answer your first question: "should act like a single stream stage" 
> from the perspective of the main flowgraph.
> And my problem (second question) is that I have a PropsSource[T] feeding 
> my main flowgraph which buffers external incomming events with a custom 
> bufferlogic.
> A example will make that clear: 
> Lets assume for the moment this events are timestamps every 30ms and the 
> processing of these timestamps by my function f respectively 
> flow/partialFlowGraph takes more than 30ms (time for all internal stages 
> together for one element)
> Then the buffers of all involved stages in main and especially in my 
> partialFlowGraph get filled up over time.
> I like to discard old timestamps in my PropsSource-Bufferlogic in that 
> case. But the old timestamps are stucked in the processing chain of my 
> partial flowgraph.
> So in.buffer(1, OverflowStrategy.dropHead) won't help here.
> Hope this simplified example shows my need for something like FuseStage.
>
>
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected] <javascript:>.
> To post to this group, send email to [email protected] 
> <javascript:>.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>
>
> *Dr. Roland Kuhn*
> *Akka Tech Lead*
> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
> twitter: @rolandkuhn
> <http://twitter.com/#!/rolandkuhn>
>  
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to