Hello Ken,

Thanks for the quick response! That is an interesting workaround. In our
case though we are using a CoProcessFunction with stateful timers. Is there
a similar workaround path available in that case? The one possible way I
could find required partitioning data in kafka in a very specific way
to match what Flink's keyBy is doing, and that it'd have additional
constraints to the method you described that would be difficult to handle
in a prod environment where we don't have full control over the producers &
input topics.

Regarding the addition of a more flexible way to take advantage of
pre-partitioned sources like in FLIP-186, would you suggest I forward this
chain over to the dev Flink mailing list?

Thanks,
Tommy



On Sat, Mar 4, 2023 at 11:32 AM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Tommy,
>
> I believe there is a way to make this work currently, but with lots of
> caveats and constraints.
>
> This assumes you want to avoid any network shuffle.
>
> 1. Both topics have names that return the same value for
> ((topicName.hashCode() * 31) & 0x7FFFF) % parallelism.
> 2. Both topics have the same number of partitions.
> 3. The parallelism of your join function exactly matches the number of
> partitions.
> 4. You can’t change any of the above without losing state.
> 5. You don’t need stateful timers.
>
> If the above is true, then you could use a CoFlatMapFunction and operator
> state to implement a stateful join.
>
> If it’s something like a left outer join without any state TTL or need to
> keep both sides in state, then it’s pretty easy.
>
> — Ken
>
> PS - it’s pretty easy to figure out a “-xxx” value to append to a topic
> name to get the hashCode() result you need.
>
> On Mar 3, 2023, at 4:56 PM, Tommy May <tvma...@gmail.com> wrote:
>
> Hello,
>
> My team has a Flink streaming job that does a stateful join across two
> high throughput kafka topics. This results in a large amount of data ser/de
> and shuffling (about 1gb/s for context). We're running into a bottleneck on
> this shuffling step. We've attempted to optimize our flink configuration,
> join logic, scale out the kafka topics & flink job, and speed up state
> access. What we see is that the join step causes backpressure on the kafka
> sources and lag slowly starts to accumulate.
>
> One idea we had to optimize this is to pre-partition the data in kafka on
> the same key that the join is happening on. This'll effectively reduce data
> shuffling to 0 and remove the bottleneck that we're seeing. I've done some
> research into the topic and from what I understand this is not
> straightforward to take advantage of in Flink. It looks to be a fairly
> commonly requested feature based on the many StackOverflow posts and slack
> questions, and I noticed there is FLIP-186 which attempts to address this
> topic as well.
>
> Are there any upcoming plans to add this feature to a future Flink
> release? I believe it'd be super impactful for similar large scale jobs out
> there. I'd be interested in helping as well, but admittedly I'm relatively
> new to Flink.  I poked around the code a bit, and it certainly did not seem
> like a straightforward addition, so it may be best handled by someone with
> more internal knowledge.
>
> Thanks,
> Tommy
>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

Reply via email to