Ah, good factoid! Thanks!

Brian Maso

On Mon, Apr 13, 2020, 03:29 Alexey Shuksto <seig...@gmail.com> wrote:

> > To be honest I'm not sure of how messages are handled when there are no
> active Source[T] instances consuming messages sent to Broadcast.sink[T]...
> > My impression is that they would be thrown away, and the bufferSize
> parameter to BroadcastHub.sink[T] only comes into play when there are one
> or more active graphs from (3) consuming messages.
>
> BroadcastHub is blocked until _all_ of the connected to it's materialized
> Source sinks signaled a demand.
>
> Thus, if you want hub to discard all elements until there is some
> meaningful consumer, you need to attach draining no-op sink to it right
> after materialization of BroadcastHub.sink[T].
>
> пятница, 10 апреля 2020 г., 21:11:40 UTC+3 пользователь Brian Maso написал:
>>
>> I suggest you post your question on the gitter channel (
>> gitter.im/akka/akka). There are a lot of knowledgeable people who can
>> answer, and I think it is a much more active space than this (deprecated)
>> list.
>>
>> But to answer your question: the materialized value of an asynchronous
>> Sink is a Future[T]. The materialized value of a BroadcastHub.sink[T] is a
>> Source[T]. Not every materialized value is a Future[_].
>>
>> So you have basically three parts:
>> 1) Your original runnable graph into which the BroadcastHub.sink[T] is
>> embedded
>> 2) A single Source[T] materialized when (1) is run -- this is a
>> re-usable "blueprint" which can be used to define multiple new runnable
>> graphs.
>> 3) 0 or more runnable graphs that receive messages through (2)
>>
>> Each message sent the BroadcastHub.sink[T] during the course of (1)'s run
>> will be queued up and delivered (ie "broadcast") to all of the runnable
>> graphs in (3) when they are run. You can re-use the Source[T] from (2)
>> multiple times, effectively allowing you to dynamically "tap" the flow of
>> messages being sent to the Broadcast.sink[T] from (1).
>>
>> (To be honest I'm not sure of how messages are handled when there are no
>> active Source[T] instances consuming messages sent to Broadcast.sink[T]...
>> My impression is that they would be thrown away, and the bufferSize
>> parameter to BroadcastHub.sink[T] only comes into play when there are one
>> or more active graphs from (3) consuming messages. Experimentation is
>> probably necessary to confirm that.)
>>
>> Best regards,
>> Brian Maso
>>
>> On Fri, Apr 10, 2020 at 10:20 AM Christophe De Troyer <
>> christoph...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I've been looking at MergeHub and BroadcastHub for Akka Stream and I am
>>> a bit confused.
>>>
>>> In the beginning of the documentation the following is mentioned:
>>>
>>> It is important to remember that even after constructing the
>>>> RunnableGraph by connecting all the source, sink and different
>>>> operators, no data will flow through it until it is materialized.
>>>> Materialization is the process of allocating all resources needed to run
>>>> the computation described by a Graph (in Akka Streams this will often
>>>> involve starting up Actors).
>>>> ...
>>>> After running (materializing) the RunnableGraph[T] we get back the
>>>> materialized value of type T.
>>>>
>>>
>>> This makes perfect sense. But I'm having issues uniting this with the
>>> code sample from the MergeHub documentation.
>>>
>>> // A simple producer that publishes a new "message" every second
>>> val producer : Source[String, Cancellable]= Source.tick(1.second, 1.second, 
>>> "New message")
>>>
>>> // Attach a BroadcastHub Sink to the producer. This will materialize to a
>>> // corresponding Source.
>>> // (We need to use toMat and Keep.right since by default the materialized
>>> // value to the left is used)
>>> val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
>>>   producer.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
>>>
>>> // By running/materializing the producer, we get back a Source, which
>>> // gives us access to the elements published by the producer.
>>> val fromProducer: Source[String, NotUsed] = runnableGraph.run()
>>>
>>> // Print out messages from the producer in two independent consumers
>>> fromProducer.runForeach(msg => println("consumer1: " + msg))
>>> fromProducer.runForeach(msg => println("consumer2: " + msg))
>>>
>>>
>>> In the above snippet a Source is created, and that is used as the Source
>>> for a runnableGraph (meaning it has a source and a sink). Conceptually I
>>> understand that the BroadcastHub is indeed a sink. But what I do not
>>> understand is that when you run/materialize that RunnableGraph, you get
>>> back a Source.
>>>
>>> The way I see it, running a graph should return a future of the types of
>>> values flowing through that graph. In this case Strings.
>>>
>>> Can somebody shed some light on this, please?
>>>
>>>  Thanks,
>>> Christophe
>>>
>>> --
>>>
>>> *****************************************************************************************************
>>> ** New discussion forum: https://discuss.akka.io/ replacing akka-user
>>> google-group soon.
>>> ** This group will soon be put into read-only mode, and replaced by
>>> discuss.akka.io
>>> ** More details:
>>> https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
>>>
>>> *****************************************************************************************************
>>> >>>>>>>>>>
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ:
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives:
>>> https://groups.google.com/group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka...@googlegroups.com.
>>> To view this discussion on the web visit
>>> https://groups.google.com/d/msgid/akka-user/facaf52d-b49d-4980-93e0-30f9ead85fed%40googlegroups.com
>>> <https://groups.google.com/d/msgid/akka-user/facaf52d-b49d-4980-93e0-30f9ead85fed%40googlegroups.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>> --
>
> *****************************************************************************************************
> ** New discussion forum: https://discuss.akka.io/ replacing akka-user
> google-group soon.
> ** This group will soon be put into read-only mode, and replaced by
> discuss.akka.io
> ** More details:
> https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
>
> *****************************************************************************************************
> >>>>>>>>>>
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/akka-user/e66ff705-6794-4b99-9ef2-3e8d168ab65e%40googlegroups.com
> <https://groups.google.com/d/msgid/akka-user/e66ff705-6794-4b99-9ef2-3e8d168ab65e%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
>

-- 
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user 
google-group soon.
** This group will soon be put into read-only mode, and replaced by 
discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
>>>>>>>>>> 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/akka-user/CAPMeo28aRcUcHi%3Dxb5sMs8t4SDE-UFt%3DAXa%3Ddsp6RB9RWKhY_Q%40mail.gmail.com.

Reply via email to