Hi Tommy,

While not coming up with a sure solution, I’ve got a number of idea on how to 
continue and shed light into the matter:


  *   With respect to diagnostics, have you enabled flame graph 
(cluster-config.rest.flamegraph.enabled),
     *   It allows you to see the call tree of each task and where dominantly 
time is spent
     *   That usually gives me quite some insight
  *   You mention serialization could be a problem:
     *   Which serialization are you using currently?
     *   I could imagine to use one the (almost) zero-copy type like RowData
        *   I considered this once but didn’t try
     *   Nico published a nice comparison of the choices w/r to serializers [1]
  *   Just for completeness: pipeline.object-reuse can cut down quite a bit on 
GC cost adding the need to execute more discipline with object mutation and 
caching un-serialized objects in arbitrary data structures

Hope this helps

Thias




[1] 
https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/




From: Tommy May <tvma...@gmail.com>
Sent: Tuesday, March 7, 2023 3:25 AM
To: David Morávek <d...@apache.org>
Cc: Ken Krugler <kkrugler_li...@transpac.com>; Flink User List 
<user@flink.apache.org>
Subject: Re: Avoiding data shuffling when reading pre-partitioned data from 
Kafka

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Ken & David,

Thanks for following up. I've responded to your questions below.

 If the number of unique keys isn’t huge, I could think of yet another 
helicopter stunt that you could try :)

Unfortunately the number of keys in our case is huge, they're unique per 
handful of events.

If your data are already pre-partitioned and the partitioning matches (hash 
partitioning on the JAVA representation of the key yielded by the KeySelector), 
you can use `reinterpretAsKeyedStream` [1] to skip the shuffle.

That comes with the additional constraints that Ken mentioned, correct? It 
could break immediately in cases if a key comes through on a different 
partition, or if the number of partitions happen to change? I'm concerned about 
that for our use case as we don't have 100% control of the upstream data source.

I feel you'd be blocked by the state access downstream (with RocksDB). Are you 
sure it isn't the case?

Yes, you are right that state access is also a limiting factor and some 
optimizations to limit that have helped quite a bit (both in our implementation 
and in using local SSDs for rocksdb). One other path we looked at is using 
memory-backed volumes for rocksdb, but ran into a limitation that we cannot 
configure Flink's process memory lower than the k8s container memory, leading 
to OOMs. More details at 
https://stackoverflow.com/questions/74118022/flink-pods-ooming-using-memory-backed-volume-with-k8s-operator.

I don't have a dashboard currently to immediately point to data shuffling as 
the primary bottleneck, but I thought it could be a huge optimization if we can 
tell Flink to take advantage of the pre-partitioned datasource, given we're 
shuffling near 1 Gb/sec right now. I can see that the join is causing the 
backpressure on the sources though, and figured that network and state acces 
would be the two primary contributors there. Let me know if you have any good 
debugging tools to narrow in on this more.

Thanks,
Tommy


On Mon, Mar 6, 2023 at 4:42 AM David Morávek 
<d...@apache.org<mailto:d...@apache.org>> wrote:
Using an operator state for a stateful join isn't great because it's meant to 
hold only a minimal state related to the operator (e.g., partition tracking).

If your data are already pre-partitioned and the partitioning matches (hash 
partitioning on the JAVA representation of the key yielded by the KeySelector), 
you can use `reinterpretAsKeyedStream` [1] to skip the shuffle.

> What we see is that the join step causes backpressure on the kafka sources 
> and lag slowly starts to accumulate.

I feel you'd be blocked by the state access downstream (with RocksDB). Are you 
sure it isn't the case?

[1] 
https://javadoc.io/static/org.apache.flink/flink-streaming-java/1.16.1/org/apache/flink/streaming/api/datastream/DataStreamUtils.html#reinterpretAsKeyedStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.api.java.functions.KeySelector-

Best,
D.

On Sun, Mar 5, 2023 at 5:31 AM Ken Krugler 
<kkrugler_li...@transpac.com<mailto:kkrugler_li...@transpac.com>> wrote:
Hi Tommy,

To use stateful timers, you need to have a keyed stream, which gets tricky when 
you’re trying to avoid network traffic caused by the keyBy()

If the number of unique keys isn’t huge, I could think of yet another 
helicopter stunt that you could try :)

It’s possible to calculate a composite key, based on the “real” key and a 
synthetic value, that will wind up on in the same slot where you’re doing this 
calculation.

So that would let you create a keyed stream which would have 
serialization/deserialization cost, but wouldn’t actually go through the 
network stack.

Since the composite key generation is deterministic, you can do the same thing 
on both streams, and join on the composite key.

You’d want to cache the mapping from the real key to the synthetic value, to 
avoid doing this calculation for every record.

If that sounds promising, lmk and I can post some code.

— Ken


On Mar 4, 2023, at 12:37 PM, Tommy May 
<tvma...@gmail.com<mailto:tvma...@gmail.com>> wrote:

Hello Ken,

Thanks for the quick response! That is an interesting workaround. In our case 
though we are using a CoProcessFunction with stateful timers. Is there a 
similar workaround path available in that case? The one possible way I could 
find required partitioning data in kafka in a very specific way to match what 
Flink's keyBy is doing, and that it'd have additional constraints to the method 
you described that would be difficult to handle in a prod environment where we 
don't have full control over the producers & input topics.

Regarding the addition of a more flexible way to take advantage of 
pre-partitioned sources like in FLIP-186, would you suggest I forward this 
chain over to the dev Flink mailing list?

Thanks,
Tommy



On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler 
<kkrugler_li...@transpac.com<mailto:kkrugler_li...@transpac.com>> wrote:
Hi Tommy,

I believe there is a way to make this work currently, but with lots of caveats 
and constraints.

This assumes you want to avoid any network shuffle.

1. Both topics have names that return the same value for ((topicName.hashCode() 
* 31) & 0x7FFFF) % parallelism.
2. Both topics have the same number of partitions.
3. The parallelism of your join function exactly matches the number of 
partitions.
4. You can’t change any of the above without losing state.
5. You don’t need stateful timers.

If the above is true, then you could use a CoFlatMapFunction and operator state 
to implement a stateful join.

If it’s something like a left outer join without any state TTL or need to keep 
both sides in state, then it’s pretty easy.

— Ken

PS - it’s pretty easy to figure out a “-xxx” value to append to a topic name to 
get the hashCode() result you need.


On Mar 3, 2023, at 4:56 PM, Tommy May 
<tvma...@gmail.com<mailto:tvma...@gmail.com>> wrote:

Hello,

My team has a Flink streaming job that does a stateful join across two high 
throughput kafka topics. This results in a large amount of data ser/de and 
shuffling (about 1gb/s for context). We're running into a bottleneck on this 
shuffling step. We've attempted to optimize our flink configuration, join 
logic, scale out the kafka topics & flink job, and speed up state access. What 
we see is that the join step causes backpressure on the kafka sources and lag 
slowly starts to accumulate.

One idea we had to optimize this is to pre-partition the data in kafka on the 
same key that the join is happening on. This'll effectively reduce data 
shuffling to 0 and remove the bottleneck that we're seeing. I've done some 
research into the topic and from what I understand this is not straightforward 
to take advantage of in Flink. It looks to be a fairly commonly requested 
feature based on the many StackOverflow posts and slack questions, and I 
noticed there is FLIP-186 which attempts to address this topic as well.

Are there any upcoming plans to add this feature to a future Flink release? I 
believe it'd be super impactful for similar large scale jobs out there. I'd be 
interested in helping as well, but admittedly I'm relatively new to Flink.  I 
poked around the code a bit, and it certainly did not seem like a 
straightforward addition, so it may be best handled by someone with more 
internal knowledge.

Thanks,
Tommy

--------------------------
Ken Krugler
http://www.scaleunlimited.com<http://www.scaleunlimited.com/>
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch




--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to