Assuming that the flow is local you can always skip entries that are old
(I'd advise against using wallclock; and nanoTime is local only) on a per
stage basis.

On Mon, Mar 30, 2015 at 2:57 PM, Endre Varga <endre.va...@typesafe.com>
wrote:

> Hi Peter,
>
> Why don't you just create a custom stage then where you bundle up all the
> calculations you need to be done synchronously?
>
> (Btw, I agree with Roland that you probably over-worry about those buffers)
>
> -Endre
>
> On Mon, Mar 30, 2015 at 2:54 PM, Peter Schmitz <petrischm...@gmail.com>
> wrote:
>
>> Hi Endre,
>>
>> I am aware of this, and I know I want to disable a core feature of akka
>> stream for a certain section of my flow.
>> That said my actor buffering in front of processing is fine but the in
>> between these two actors processing by my partial flowgraph should be done
>> without buffering due to the staleness issue I mentioned.
>>
>> Peter
>>
>>
>> Am Montag, 30. März 2015 14:39:53 UTC+2 schrieb drewhk:
>>>
>>> Hi Peter,
>>>
>>> The issue is, that these processing elements are actors, which have a
>>> mailbox, so there is buffering in any case. You cannot have meaningful
>>> asynchronous processing without at least a buffer size of one.
>>>
>>> -Endre
>>>
>>> On Mon, Mar 30, 2015 at 2:36 PM, Peter Schmitz <petris...@gmail.com>
>>> wrote:
>>>
>>>> Hi Roland,
>>>>
>>>> I've tried different OverflowStrategies in front of my partial flow,
>>>> but none worked... as expected because this doesn't change my problem that
>>>> there are already stale elements in the processing chain. I guess the chain
>>>> is about 10 stages long (but user defined and not known in advance) and
>>>> even with buffer = 1 for each stage there are 10 stale elements since the
>>>> time buffers have been filled up (> 10 fast inputs). So there is a non
>>>> acceptable lag on my "screen" regarding my example.
>>>> Recently I tried to solve this by using a FlowGraph with one in-actor
>>>> and one out-actor (PropsSource/PropsSink) passed. All in-elements are piped
>>>> through my partial flow graph and passed to the out-actor. So outside of
>>>> the main flowgraph after materialization I connected the out-actor to the
>>>> in-actor so if the out-actor gets an element the in-actor is notified to
>>>> send the next element into the processing chain. In addition the in-actor
>>>> buffers all external incoming events and manages to discard old ones.
>>>> My remaining problem is, that it is a user defined processing and I
>>>> can't assume that one input element will generate exactly one output
>>>> element. So I can determine when processing was done.
>>>>
>>>>
>>>>
>>>> Am Montag, 30. März 2015 14:06:59 UTC+2 schrieb rkuhn:
>>>>>
>>>>> Hi Peter,
>>>>>
>>>>> 30 mar 2015 kl. 13:30 skrev Peter Schmitz <petris...@gmail.com>:
>>>>>
>>>>> Hi Roland,
>>>>>
>>>>> thank you for your detailed answer.
>>>>> In my timestamp example timestamps are processed while new timestamps
>>>>> arrive and at this point processing of old timestamps is obsolete but they
>>>>> are somewhere in the processing chain. Of course I can discard them at the
>>>>> end when dealing with timestamps (comparing with the current time) and
>>>>> perhaps (have to think about that) even when not dealing with timestamps
>>>>> but rather with some other inputstream with new incoming elements
>>>>> invalidating the processing of not fully processed elements before they 
>>>>> are
>>>>> stucked in the buffers of some stage(s) of my partial flowgraph.
>>>>> A synchronous processing of my partial(!) flowgraph avoids having
>>>>> elements that are obsolete AND not fully processed. I this manner only
>>>>> required elements are processed and CPU load is lower and more important a
>>>>> more realtime outcome.
>>>>> For example, say you have a stream of stock prices and you do some
>>>>> processing in a partial flowgraph and finally you show the result as some
>>>>> remolded "value" on a screen. If stockprices are faster generated than 
>>>>> your
>>>>> processing yielding a screen "value" can handle you won't see real time
>>>>> prices anymore and if the prices stream pauses emitting at stock exchange
>>>>> closing your screen is still refreshing until all involved intermediate
>>>>> buffers are drained.
>>>>> That won't happen with a certain buffer strategy (DropHead) for the
>>>>> stock prices stream and synchronous processing for the following partial
>>>>> flowgraph.
>>>>> So my program is not broken but it is not realtime and heats up my CPU
>>>>> more than necessary.
>>>>> Delay is inevitable because processing takes time but unnecessary
>>>>> delay of having obsolete elements in internal buffers is evitable.
>>>>>
>>>>>
>>>>> It seems that you are worrying a little too much: in practice all
>>>>> these buffers of size one will ever do is to allow pipelining of the
>>>>> processing of stages, which means that the latency of pushing something
>>>>> through the pipeline should not change much between this minimal buffer 
>>>>> and
>>>>> no buffering at all, while the throughput will increase due to the ability
>>>>> of processing multiple steps in parallel. For the use-case you describe 
>>>>> I’d
>>>>> recommend just placing a buffer with an adequate dropping strategy in 
>>>>> front
>>>>> of your partial flow (I’d try out OverflowStrategy.dropBuffer first) and
>>>>> see if that is not good enough. Fusing stages together may benefit latency
>>>>> (if each of them does very little processing) but will cost in
>>>>> throughput—meaning that your maximal refresh rate will decrease. Of course
>>>>> you’ll need to pick the right approach according to your specific
>>>>> requirements.
>>>>>
>>>>> Regards,
>>>>>
>>>>> Roland
>>>>>
>>>>>
>>>>> I'll act on your advice and will have a look at OneBoundedInterpreted
>>>>> and mapConcat.
>>>>>
>>>>> Looking forward to operator fusion.
>>>>>
>>>>> Peter
>>>>>
>>>>>
>>>>>
>>>>> Am Montag, 30. März 2015 12:48:12 UTC+2 schrieb rkuhn:
>>>>>>
>>>>>> Hi Peter,
>>>>>>
>>>>>> thanks for this explanation, but it is not yet clear to me how
>>>>>> exactly your program would be broken with bufferSize=1
>>>>>> (implementation-wise). What exactly is the invariant that is violated, 
>>>>>> and
>>>>>> how would that hold with a hypothetical bufferSize=0?
>>>>>>
>>>>>> If all you are interested in is synchronous operation of a chain of
>>>>>> Stages, then you might look into manually operating a 
>>>>>> OneBoundedInterpreter
>>>>>> in a mapConcat stage—while operator fusion is on the horizon (but out of
>>>>>> scope for Streams 1.0) it will not strictly guarantee buffer-freedom
>>>>>> because it will be a best-effort optimization.
>>>>>>
>>>>>> OTOH if I imagine a system that takes in some stream of elements and
>>>>>> does processing based on timestamps then why would it not be sufficient 
>>>>>> to
>>>>>> timestamp things when they enter and discard them along the way? Why does
>>>>>> the processor need to be buffer-free? In particular: the processing will
>>>>>> time, too, so what differentiates the delay introduced by buffering from
>>>>>> the processing latency?
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Roland
>>>>>>
>>>>>> 30 mar 2015 kl. 12:22 skrev Peter Schmitz <petris...@gmail.com>:
>>>>>>
>>>>>> Always assumed import FlowGraph.Implicits._ and all other necessary
>>>>>> imports:
>>>>>> I can pack all my logic into a function:
>>>>>>
>>>>>> val f: FlowGraph.Builder => Outlet[T] => Outlet[S]
>>>>>>
>>>>>> for some custom type T and type S. Now I can wrap/use this function
>>>>>> as a Flow or PartialFlowGraph:
>>>>>>
>>>>>> val flow: Flow[Input, Trigger.type, Unit] = Flow() { implicit b =>
>>>>>>   val broadcast = b.add(Broadcast[Input](1))
>>>>>>   (broadcast.in, f(b)(broadcast.out(0)))
>>>>>> }
>>>>>>
>>>>>> or
>>>>>>
>>>>>> val partialFlowGraph = FlowGraph.partial() { implicit b =>
>>>>>>   val broadcast = b.add(Broadcast[Input](1))
>>>>>>   UniformFanInShape(f(b)(broadcast.out(0)), broadcast.in)
>>>>>> }
>>>>>>
>>>>>> Now in my main flowgraph:
>>>>>>
>>>>>> FlowGraph.closed() { implicit b =>
>>>>>>   val in = Source.empty[T]
>>>>>>   val out = Sink.ignore
>>>>>>
>>>>>>   in ~> flow ~> out
>>>>>>   // or :
>>>>>>   val fg = b.add(partialFlowGraph)
>>>>>>   in ~> fg.in(0)
>>>>>>   fg.out ~> out
>>>>>> }
>>>>>>
>>>>>> In reality the above flowgraph is more complex but for this showcase
>>>>>> a dummy in and out is used, e.g. actually "in" is a PropsSource[T]
>>>>>> externally feeded by an actor.
>>>>>> I want flow or partialFlowGraph to have zero buffers internally but
>>>>>> at the outside as usual and thus behave as a single Flow[T].map(f).
>>>>>> So to answer your first question: "should act like a single stream
>>>>>> stage" from the perspective of the main flowgraph.
>>>>>> And my problem (second question) is that I have a PropsSource[T]
>>>>>> feeding my main flowgraph which buffers external incomming events with a
>>>>>> custom bufferlogic.
>>>>>> A example will make that clear:
>>>>>> Lets assume for the moment this events are timestamps every 30ms and
>>>>>> the processing of these timestamps by my function f respectively
>>>>>> flow/partialFlowGraph takes more than 30ms (time for all internal stages
>>>>>> together for one element)
>>>>>> Then the buffers of all involved stages in main and especially in my
>>>>>> partialFlowGraph get filled up over time.
>>>>>> I like to discard old timestamps in my PropsSource-Bufferlogic in
>>>>>> that case. But the old timestamps are stucked in the processing chain of 
>>>>>> my
>>>>>> partial flowgraph.
>>>>>> So in.buffer(1, OverflowStrategy.dropHead) won't help here.
>>>>>> Hope this simplified example shows my need for something like
>>>>>> FuseStage.
>>>>>>
>>>>>>
>>>>>> --
>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/a
>>>>>> kka/current/additional/faq.html
>>>>>> >>>>>>>>>> Search the archives: https://groups.googl
>>>>>> e.com/group/akka-user
>>>>>> ---
>>>>>> You received this message because you are subscribed to the Google
>>>>>> Groups "Akka User List" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>> send an email to akka-user+...@googlegroups.com.
>>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Dr. Roland Kuhn*
>>>>>> *Akka Tech Lead*
>>>>>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
>>>>>> twitter: @rolandkuhn
>>>>>> <http://twitter.com/#!/rolandkuhn>
>>>>>>
>>>>>>
>>>>> --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/a
>>>>> kka/current/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: https://groups.googl
>>>>> e.com/group/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to akka-user+...@googlegroups.com.
>>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *Dr. Roland Kuhn*
>>>>> *Akka Tech Lead*
>>>>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM.
>>>>> twitter: @rolandkuhn
>>>>> <http://twitter.com/#!/rolandkuhn>
>>>>>
>>>>>  --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>> current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>> group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>  --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to