Thanks Ron and Thias,

I understand flink parallelism works at task level. Distribute n subtasks
of the operator across your cluster and parallel process the elements by
distributing them across multiple instances of your operator.  This gives
me very high throughput. What I want to know is how to achieve low latency
per element processing. Lets forget the flink way of doing things for a
minute. Given an element, you need to perform 10 independent functions on
it each taking 100 ms and emit one output containing all 10 outputs. I
would want the total e2e latency to be 100 ms given they all are
independent. The way we do it in any java application is use abstracts like
completablefutures or similar constructs and perform them concurrently and
combine the results at the end. My experience with flink or other stream
processing frameworks is that you do not work with concurrent threads
because threads are used to achieve task parallelism. So what's the way to
achieve the 100 ms e2e latency.  That's when the fan out and aggregate
pattern comes handy. You fan out(branch out) your datastream to 10 copies
and feed them to 10 different operators. And aggregate them by the element
id finally(union operator?). But this brings in the de/serialization and
possibly shuffle between nodes if parallelism of each operator differs?. So
hence the question around multithreading. If I can call these 10 functions
in a concurrent manner from a single map function in flink, I can achieve
low latency without de/serialization + shuffle.

Thias, I went through the thesis of your student, a very interesting study
on optimizations. 👍

On Tue, Aug 15, 2023 at 12:04 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

>
>
> Hi Vignesh,
>
>
>
> In addition to what Ron has said, there are a number of options to
> consider, depending on the nature of your calculations:
>
>
>
> Given that your main focus seems to be latency:
>
>    - As Ron has said, Flink manages parallelism in a coarse grained way
>    that is optimized for spending as little time as possible in
>    synchronization, and takes away the need to manually synchronize
>    - If you spawn your own threads (it’s possible) you need to manage
>    synchronization yourself and this can add considerable to latency and
>    create back-pressure
>       - You would combine two implementations of parallelism and probably
>       end up in threads stealing CPU resources from each other
>    - When planning for horizontal scalability in Flink, plan CPU
>    resources so they can manage the workload
>
> Fanout and collection pattern in general is a good idea, given that in
> order to allow for horizontal scaling you need to have at least one network
> shuffle anyway
>
>    - Make sure you use the best serializer possible for your data.
>    - Out of the box, Pojo serializer is hard to top, a hand coded
>    serializer might help (keep this for later in you dev process)
>    - If you can arrange you problem so that operators can be chained into
>    a single chain you can avoid serialization within the chain
>    - In Flink, if you union() or connect() multiple streams, chaining is
>    interrupted and this adds considerably to latency
>
> There is a neat trick to combine the parallelism-per-key-group and the
> parallelism-per-algorithm into a single implementation and end up with
> single chains with little de-/serialization except for the fanout
>
>    - One of my students has devised this scheme in his masters thesis
>    (see [1] chapter 4.4.1 pp. 69)
>    - With his implementation we reduces back-pressure and latency
>    significantly for some orders of magnitude
>
>
>
> I hope this helps, feels free to discuss details 😊
>
>
>
> Thias
>
>
>
>
>
>
>
> [1] Master Thesis, Dominik Bünzli, University of Zurich, 2021:
> https://www.merlin.uzh.ch/contributionDocument/download/14168
> <https://urldefense.com/v3/__https://www.merlin.uzh.ch/contributionDocument/download/14168__;!!Op6eflyXZCqGR5I!D0WaqFZfknkd-7hl-VgoNQ_l5tszcDDoP-vY4yBoLTIBRev_Iqtkyrei7vIQtduLckRXkz5Q3SIo42ZmYhhONov02b1Cl1g$>
>
>
>
>
>
>
>
> *From:* liu ron <ron9....@gmail.com>
> *Sent:* Dienstag, 15. August 2023 03:54
> *To:* Vignesh Kumar Kathiresan <vkath...@yahooinc.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Recommendations on using multithreading in flink map
> functions in java
>
>
>
> Hi, Vignesh
>
>
>
> Flink is a distributed parallel computing framework, each MapFunction is
> actually a separate thread. If you want more threads to process the data,
> you can increase the parallelism of the MapFunction without having to use
> multiple threads in a single MapFunction, which in itself violates the
> original design intent of Flink.
>
>
>
> Best,
>
> Ron
>
>
>
> Vignesh Kumar Kathiresan via user <user@flink.apache.org> 于2023年8月15日周二
> 03:59写道:
>
> Hello All,
>
> *Problem statement *
>
> For a given element, I have to perform multiple(lets say N) operations on
> it. All the N operations are independent of each other. And for achieving
> lowest latency, I want to do them concurrently. I want to understand what's
> the best way to perform it in flink?.
>
>
> I understand flink achieves huge parallelism across elements. But is it
> anti-pattern to do parallel processing in a map func at single element
> level? I do not see anything on the internet for using multithreading
> inside a map function.
>
> I can always fan out with multiple copies of the same element and send
> them to different operators. But it incurs at the least a
> serialize/deserialize cost and may also incur network shuffle. Trying to
> see if a multithreaded approach is better.
>
>
>
> Thanks,
>
> Vignesh
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to