… mirrored back to user list …

On additional thing you can do is to not split into 10 additional tasks but:

  *   Fan-out your original event into 10 copies (original key, 
1-of-10-algorithm-key, event),
  *   key by the combined key (original key, algorithm key)
  *   have a single operator chain that internally switches by algorithm key
  *   then collect by event id to enrich a final result
  *   much like mentioned in [1]
This made all the difference for us with orders of magnitude better overall 
latency and backpressure because we avoided multiple layers of parallelism (job 
parallelism * algorithm parallelism).

Thias

[1] Master Thesis, Dominik Bünzli, University of Zurich, 2021: 
https://www.merlin.uzh.ch/contributionDocument/download/14168<https://urldefense.com/v3/__https:/www.merlin.uzh.ch/contributionDocument/download/14168__;!!Op6eflyXZCqGR5I!D0WaqFZfknkd-7hl-VgoNQ_l5tszcDDoP-vY4yBoLTIBRev_Iqtkyrei7vIQtduLckRXkz5Q3SIo42ZmYhhONov02b1Cl1g$>


From: Vignesh Kumar Kathiresan <vkath...@yahooinc.com>
Sent: Thursday, August 17, 2023 10:27 PM
To: Schwalbe Matthias <matthias.schwa...@viseca.ch>
Cc: liu ron <ron9....@gmail.com>; dominik.buen...@swisscom.com
Subject: Re: [E] RE: Recommendations on using multithreading in flink map 
functions in java

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hello Thias,

Thanks for the explanation. The objective to achieve e2e 100 ms latency was to 
establish the latency vs ser/deser + I/O  tradeoff. 1 sec(when you execute all 
the 10 algorithms in sequence) vs ~100 ms(when you execute them in parallel).

My takeaway is that in streaming frameworks when you want to move your e2e 
latency towards the 100 ms end of the latency spectrum

  *   Separate the 10 algorithms as different tasks (unchain) so that they are 
executed in different threads
  *   Fan out the element(branch out) and send them to the each algorithm 
task(separate task)
  *   Incur the serialize/deserialize cost and try to avoid a network shuffle 
as much as possible(by having the same parallelism in all the 11 operators? so 
that they are run in a different thread but the same worker)
  *   Combine the results using some stateful process function finally.


On Wed, Aug 16, 2023 at 12:01 AM Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote:
Hi Ron,

What you say is pretty much similar to what I’ve written 😊, the difference is 
the focus:


  *   When you use a concurrency library, things are not necessarily running in 
parallel, they serialize/schedule the execution of tasks to the available CPU 
cores, and need synchronization
  *   i.e. you end up with total latency bigger than the 100ms (even if you’ve 
got 10 dedicated CPU cores, because of synchronization)
  *   the whole matter is affected by Amdahls law [1]

Back to your most prominent question: “What I want to know is how to achieve 
low latency per element processing”:
Strictly speaking, the only way to achieve the 100ms overall latency is to have 
10 dedicated CPU cores that don’t do anything else and avoid synchronization at 
any cost and events must have a minimum distance of 100ms .
This is not possible, but with a couple of nifty tricks you can come very close 
to it.
However another aspect is, that this practically only scales up to the maximum 
feasible number of CPU cores (512 core e.g.) in a system beyond which you 
cannot avoid serialization and synchronization.

The way I understand the design of Flink is:

  *   That the focus is on throughput with decent latency values
  *   Flink jobs can be scaled linearly in a wide range of parallelism
  *   i.e. within that range Flink does not run into the effects of Amdahls 
law, because it avoids synchronization among tasks
  *   This comes with a price: serialization efforts, and I/O cost
  *   A Flink (sub-)task is basically a message queue

     *   where incoming events sit in a buffer and are process one after the 
other (=latency),
     *   buffering incurs serialization (latency),
     *   outgoing messages for non-chained operators are also serialized and 
buffered (latency)
     *   before they get sent out to a downstream (sub-)task (configurable size 
and time triggers on buffer (latency))

  *   the difference that makes the difference is that all these forms of 
latency are linear to the number of events, i.e. the effects of Amdahls law 
don’t kick in

Independent of the runtime (Flink or non-Flink) it is good to use only a single 
means/granularity of parallelism/concurrency.
This way we avoid a lot of synchronization cost and avoid one level steal 
resources from other levels in an unpredictable way (= latency/jitter)

The solution that I proposed in my previous way does exactly this:

  *   it unifies the parallelism used for sharding (per key group parallelism) 
with the parallelism for the 10 calculation algorithms
  *   it scales linearly, avoids backpressure given enough resources, and has a 
decent overall latency (although not the 100ms)

In order to minimize serialization cost you could consider serialization free 
types (see BinaryRowData [2] for ideas)

Hope this helps 😊

Thias


[1] 
https://en.wikipedia.org/wiki/Amdahl%27s_law<https://urldefense.com/v3/__https:/en.wikipedia.org/wiki/Amdahl*27s_law__;JQ!!Op6eflyXZCqGR5I!FYUvCI9Zp6fGS1Tbs35Xe8J8-7mW6Be8tKhoBZL4jxDl7_Ic3BBx2fOmSYD6IjnXLnsepVX9Z26_8pe79ZXjdZfb-KSM8iI$>
[2] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java__;!!Op6eflyXZCqGR5I!FYUvCI9Zp6fGS1Tbs35Xe8J8-7mW6Be8tKhoBZL4jxDl7_Ic3BBx2fOmSYD6IjnXLnsepVX9Z26_8pe79ZXjdZfbcgdYQOk$>



From: Vignesh Kumar Kathiresan via user 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Sent: Mittwoch, 16. August 2023 01:49
To: Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>>
Cc: liu ron <ron9....@gmail.com<mailto:ron9....@gmail.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org>; 
dominik.buen...@swisscom.com<mailto:dominik.buen...@swisscom.com>
Subject: Re: [E] RE: Recommendations on using multithreading in flink map 
functions in java

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Thanks Ron and Thias,

I understand flink parallelism works at task level. Distribute n subtasks of 
the operator across your cluster and parallel process the elements by 
distributing them across multiple instances of your operator.  This gives me 
very high throughput. What I want to know is how to achieve low latency per 
element processing. Lets forget the flink way of doing things for a minute. 
Given an element, you need to perform 10 independent functions on it each 
taking 100 ms and emit one output containing all 10 outputs. I would want the 
total e2e latency to be 100 ms given they all are independent. The way we do it 
in any java application is use abstracts like completablefutures or similar 
constructs and perform them concurrently and combine the results at the end. My 
experience with flink or other stream processing frameworks is that you do not 
work with concurrent threads because threads are used to achieve task 
parallelism. So what's the way to achieve the 100 ms e2e latency.  That's when 
the fan out and aggregate pattern comes handy. You fan out(branch out) your 
datastream to 10 copies and feed them to 10 different operators. And aggregate 
them by the element id finally(union operator?). But this brings in the 
de/serialization and possibly shuffle between nodes if parallelism of each 
operator differs?. So hence the question around multithreading. If I can call 
these 10 functions in a concurrent manner from a single map function in flink, 
I can achieve low latency without de/serialization + shuffle.

Thias, I went through the thesis of your student, a very interesting study on 
optimizations. 👍

On Tue, Aug 15, 2023 at 12:04 AM Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote:

Hi Vignesh,

In addition to what Ron has said, there are a number of options to consider, 
depending on the nature of your calculations:

Given that your main focus seems to be latency:

  *   As Ron has said, Flink manages parallelism in a coarse grained way that 
is optimized for spending as little time as possible in synchronization, and 
takes away the need to manually synchronize
  *   If you spawn your own threads (it’s possible) you need to manage 
synchronization yourself and this can add considerable to latency and create 
back-pressure

     *   You would combine two implementations of parallelism and probably end 
up in threads stealing CPU resources from each other

  *   When planning for horizontal scalability in Flink, plan CPU resources so 
they can manage the workload
Fanout and collection pattern in general is a good idea, given that in order to 
allow for horizontal scaling you need to have at least one network shuffle 
anyway

  *   Make sure you use the best serializer possible for your data.
  *   Out of the box, Pojo serializer is hard to top, a hand coded serializer 
might help (keep this for later in you dev process)
  *   If you can arrange you problem so that operators can be chained into a 
single chain you can avoid serialization within the chain
  *   In Flink, if you union() or connect() multiple streams, chaining is 
interrupted and this adds considerably to latency
There is a neat trick to combine the parallelism-per-key-group and the 
parallelism-per-algorithm into a single implementation and end up with single 
chains with little de-/serialization except for the fanout

  *   One of my students has devised this scheme in his masters thesis (see [1] 
chapter 4.4.1 pp. 69)
  *   With his implementation we reduces back-pressure and latency 
significantly for some orders of magnitude

I hope this helps, feels free to discuss details 😊

Thias



[1] Master Thesis, Dominik Bünzli, University of Zurich, 2021: 
https://www.merlin.uzh.ch/contributionDocument/download/14168<https://urldefense.com/v3/__https:/www.merlin.uzh.ch/contributionDocument/download/14168__;!!Op6eflyXZCqGR5I!D0WaqFZfknkd-7hl-VgoNQ_l5tszcDDoP-vY4yBoLTIBRev_Iqtkyrei7vIQtduLckRXkz5Q3SIo42ZmYhhONov02b1Cl1g$>



From: liu ron <ron9....@gmail.com<mailto:ron9....@gmail.com>>
Sent: Dienstag, 15. August 2023 03:54
To: Vignesh Kumar Kathiresan 
<vkath...@yahooinc.com<mailto:vkath...@yahooinc.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Recommendations on using multithreading in flink map functions in 
java

Hi, Vignesh

Flink is a distributed parallel computing framework, each MapFunction is 
actually a separate thread. If you want more threads to process the data, you 
can increase the parallelism of the MapFunction without having to use multiple 
threads in a single MapFunction, which in itself violates the original design 
intent of Flink.

Best,
Ron

Vignesh Kumar Kathiresan via user 
<user@flink.apache.org<mailto:user@flink.apache.org>> 于2023年8月15日周二 03:59写道:
Hello All,

Problem statement
For a given element, I have to perform multiple(lets say N) operations on it. 
All the N operations are independent of each other. And for achieving lowest 
latency, I want to do them concurrently. I want to understand what's the best 
way to perform it in flink?.

I understand flink achieves huge parallelism across elements. But is it 
anti-pattern to do parallel processing in a map func at single element level? I 
do not see anything on the internet for using multithreading inside a map 
function.

I can always fan out with multiple copies of the same element and send them to 
different operators. But it incurs at the least a serialize/deserialize cost 
and may also incur network shuffle. Trying to see if a multithreaded approach 
is better.

Thanks,
Vignesh
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to