Hi Peter,

thanks for this explanation, but it is not yet clear to me how exactly your 
program would be broken with bufferSize=1 (implementation-wise). What exactly 
is the invariant that is violated, and how would that hold with a hypothetical 
bufferSize=0?

If all you are interested in is synchronous operation of a chain of Stages, 
then you might look into manually operating a OneBoundedInterpreter in a 
mapConcat stage—while operator fusion is on the horizon (but out of scope for 
Streams 1.0) it will not strictly guarantee buffer-freedom because it will be a 
best-effort optimization.

OTOH if I imagine a system that takes in some stream of elements and does 
processing based on timestamps then why would it not be sufficient to timestamp 
things when they enter and discard them along the way? Why does the processor 
need to be buffer-free? In particular: the processing will time, too, so what 
differentiates the delay introduced by buffering from the processing latency?

Regards,

Roland

> 30 mar 2015 kl. 12:22 skrev Peter Schmitz <petrischm...@gmail.com>:
> 
> Always assumed import FlowGraph.Implicits._ and all other necessary imports:
> I can pack all my logic into a function:
> 
> val f: FlowGraph.Builder => Outlet[T] => Outlet[S]
> 
> for some custom type T and type S. Now I can wrap/use this function as a Flow 
> or PartialFlowGraph:
> 
> val flow: Flow[Input, Trigger.type, Unit] = Flow() { implicit b =>
>   val broadcast = b.add(Broadcast[Input](1))
>   (broadcast.in, f(b)(broadcast.out(0)))
> }
> 
> or
> 
> val partialFlowGraph = FlowGraph.partial() { implicit b =>
>   val broadcast = b.add(Broadcast[Input](1))
>   UniformFanInShape(f(b)(broadcast.out(0)), broadcast.in)
> }
> 
> Now in my main flowgraph:
> 
> FlowGraph.closed() { implicit b =>
>   val in = Source.empty[T]
>   val out = Sink.ignore
> 
>   in ~> flow ~> out
>   // or : 
>   val fg = b.add(partialFlowGraph)
>   in ~> fg.in(0)
>   fg.out ~> out
> }
> 
> In reality the above flowgraph is more complex but for this showcase a dummy 
> in and out is used, e.g. actually "in" is a PropsSource[T] externally feeded 
> by an actor.
> I want flow or partialFlowGraph to have zero buffers internally but at the 
> outside as usual and thus behave as a single Flow[T].map(f).
> So to answer your first question: "should act like a single stream stage" 
> from the perspective of the main flowgraph.
> And my problem (second question) is that I have a PropsSource[T] feeding my 
> main flowgraph which buffers external incomming events with a custom 
> bufferlogic.
> A example will make that clear: 
> Lets assume for the moment this events are timestamps every 30ms and the 
> processing of these timestamps by my function f respectively 
> flow/partialFlowGraph takes more than 30ms (time for all internal stages 
> together for one element)
> Then the buffers of all involved stages in main and especially in my 
> partialFlowGraph get filled up over time.
> I like to discard old timestamps in my PropsSource-Bufferlogic in that case. 
> But the old timestamps are stucked in the processing chain of my partial 
> flowgraph.
> So in.buffer(1, OverflowStrategy.dropHead) won't help here.
> Hope this simplified example shows my need for something like FuseStage.
> 
> 
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/>
> >>>>>>>>>> Check the FAQ: 
> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html>
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user 
> >>>>>>>>>> <https://groups.google.com/group/akka-user>
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com 
> <mailto:akka-user+unsubscr...@googlegroups.com>.
> To post to this group, send email to akka-user@googlegroups.com 
> <mailto:akka-user@googlegroups.com>.
> Visit this group at http://groups.google.com/group/akka-user 
> <http://groups.google.com/group/akka-user>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.



Dr. Roland Kuhn
Akka Tech Lead
Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
twitter: @rolandkuhn
 <http://twitter.com/#!/rolandkuhn>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to