On Thu, Oct 16, 2014 at 10:49 AM, Alec Zorab <aleczo...@gmail.com> wrote:

> I think this is a very common use case for applications streaming time
> sensitive data (for example prices) where we would want to do an n-way zip
> and then receive the most recent set of data for those n things.
>

I agree, and it is also useful in other timing related contexts
(throttling, implementing traffic shaping, etc.). The simple solution is to
have Zip as it is now for deterministic inputs (since it has higher
throughput) and have a counterpart that is useful for time-oriented,
non-deterministic streams. As Martynas said, this latter is simply a
buffer-1 version of the Zip internally, so this is trivial to implement.

-Endre


>
> In that case you'd want to do several instruments conflating with
> (identity, _._2) and a synchronised zip - the timer here is a bit of a red
> herring, there are plenty of cases where I just want the most recent
> element of several streams when I pull from them.
>
> On 15 October 2014 21:03, Vladimir Koshelev <vr.koshe...@googlemail.com>
> wrote:
>
>> Hi Martynas,
>>
>> thank you a lot for the clarification! Now it all make sense to me again
>> :)
>> I'm not sure, that it worth of creating an issue. As you noticed, this is
>> an unusual use-case - having conflate on the one side of a zip and a kind
>> of timer on the other. I think a TimedTransform is a better fit for
>> implementing such functionality (folding on stream and emitting
>> intermediate result periodically). But even in the case of conflate and zip
>> - downstream should be able to deal with those kind of elements anyway.
>> What do you think?
>>
>> On Wednesday, October 15, 2014 12:38:31 PM UTC+2, Martynas Mickevičius
>> wrote:
>>>
>>> Hi Vladimir,
>>>
>>> you have hit on the interesting issue. Your understanding is correct but
>>> the output of your example is not what you expect, because every stage in
>>> the stream works in batches. The size of the batch is the size of the input
>>> buffer which can be set in *akka.stream.materializer.{initial,
>>> max}-input-buffer-size.*
>>>
>>> So when zip asks upstream for more elements it asks for more than one
>>> element. That request can be fulfilled by the faster source with an element
>>> every 100 milliseconds which it does and conflate does not have time to
>>> kick in. That is why you see lots of 1s printed out.
>>>
>>> From time to time buffers align and zip's request for more elements is
>>> fulfilled first by accumulated value by the conflate and then again by 1s
>>> every 100 milliseconds.
>>>
>>> A quick fix is setting input buffer size to 1 for a whole stream.
>>> However that makes the whole stream slower.
>>>
>>> val set = MaterializerSettings(system).withInputBuffer(1, 1)
>>> implicit val fm = FlowMaterializer(set)
>>>
>>> A more proper fix would be to provide "synchronize" element which would
>>> be the same zip under the hood but with the buffer size of one. Could you
>>> create an issue in Akka issue tracker for that?
>>>
>>> On Tue, Oct 14, 2014 at 11:07 PM, Vladimir Koshelev <
>>> vr.ko...@googlemail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm trying to use akka-stream and struggling to understand how conflate
>>>> works. Here a snippet of code I'm using:
>>>>
>>>> object ConflateDemo extends App {
>>>>
>>>>   import scala.concurrent.duration._
>>>>
>>>>   implicit val system = ActorSystem("demo")
>>>>
>>>>   implicit val fm = FlowMaterializer()
>>>>
>>>>   case object Tick
>>>>
>>>>   val fast = FlowFrom(Duration.Zero, 100.millis, () =>
>>>> Tick).conflate[Int](_ => 1, (acc,_) => acc + 1)
>>>>
>>>>   val slow = FlowFrom(Duration.Zero, 1.second, () => Tick)
>>>>
>>>>   import akka.stream.scaladsl2.FlowGraphImplicits._
>>>>
>>>>   val g = FlowGraph { implicit b =>
>>>>
>>>>     val zip = Zip[Int, Tick.type]
>>>>
>>>>     slow ~> zip.right
>>>>
>>>>     fast ~> zip.left
>>>>
>>>>     zip.out ~> FlowFrom[(Int, Tick.type)].map(_._1) ~>
>>>> ForeachSink[Int]( v =>  print(" " + v))
>>>>
>>>>   }.run
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>> My intention is to create two streams, first produces events 10 times
>>>> faster than the second one. Conflate on the fast stream should count
>>>> events. Then I zip those streams and print results of conflate out.
>>>>
>>>> My understanding of how it should work:
>>>>
>>>>
>>>>    - conflate consumes events produced by fast stream as fast as they
>>>>    can be produced, so there is always demand and all ticks are emitted and
>>>>    counted.
>>>>    - As soon as an event from the slow flow arrives to the zip, zip
>>>>    demands an element from conflate and sends them into sink.
>>>>
>>>> So, I have expected output looking like: 10 10 10 10 10 ...
>>>> What I get:  1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 55 1 1 1 1 1 1 1 73 1 1 1
>>>> 1 1 1 1 73 1 1 1 1 1 1 1 73 1 1 1 1 1 1 1 73 1 1 1 1 1 1 1 72 1 1 1 1 1 1 1
>>>> 74 1 1 1 1 1 1 1 72 1 1 ...
>>>>
>>>> What am i missing? Is there some mistake in the code or do I
>>>> misunderstand how conflate and zip are working?
>>>>
>>>>
>>>> Vladimir
>>>>
>>>>
>>>>
>>>>  --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>> current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>> group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> --
>>> Martynas Mickevičius
>>> Typesafe <http://typesafe.com/> – Reactive
>>> <http://www.reactivemanifesto.org/> Apps on the JVM
>>>
>>  --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+unsubscr...@googlegroups.com.
>> To post to this group, send email to akka-user@googlegroups.com.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to