Hi Dan,
> I've tried playing around with parallelism and resources.  It does help.
Glad to hear your problem is solved 😀.

> Does Flink have special logic with the built in interval join code that
impacts how kafka data sources are read?
No. If you said the way I mentioned in the last email, I mean to add
control the consumption order of each source in a custom Kafka connector.

Dan Hill <quietgol...@gmail.com> 于2021年7月21日周三 下午2:10写道:

> Thanks JING and Caizhi!
>
> Yea, I've tried playing around with parallelism and resources.  It does
> help.
>
> We have our own join operator that acts like an interval join (with fuzzy
> matching).  We wrote our own KeyedCoProcessFunction and modeled it closely
> after the internal interval join code.  Does Flink have special logic with
> the built in interval join code that impacts how kafka data sources are
> read?
>
>
>
> On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG <beyond1...@gmail.com> wrote:
>
>> Hi Dan,
>> You are right. In interval join, if one of input stream is far ahead of
>> the other one, its data would be buffered into state until watermark of the
>> other input stream catches up.
>> This is a known issue of interval join. And this situation is even worse
>> in your example because of the following reasons:
>> 1. Running as backfills
>> 2. There are cascading interval joins in the topology
>>
>> There is a hack way to walk around, hope it helps. Control the consume
>> data of each source based on the following sequence:
>> 1. Consume the larger data source in the same join after the smaller
>> source consumption finished.
>> 2. Consume the source in the following join after the previous join
>> finished
>>
>> BTW: Please double check you use interval join instead of regular join,
>> this would happen if compare two field with regular timestamp type in join
>> condition instead of time attribute.
>>
>> Best,
>> JING ZHANG
>>
>> Dan Hill <quietgol...@gmail.com> 于2021年7月21日周三 上午4:25写道:
>>
>>> Hi.  My team's flink job has cascading interval joins.  The problem I'm
>>> outlining below is fine when streaming normally.  It's an issue with
>>> backfills.  We've been running into a bunch of backfills to evaluate the
>>> job over older data.
>>>
>>> When running as backfills, I've noticed that sometimes one of downstream
>>> kafka inputs will read in a lot of data from it's kafka source before the
>>> upstream kafka sources makes much progress.  The downstream kafka source
>>> gets far ahead of the interval join window constrained by the upstream
>>> sources.  This appears to cause the state to grow unnecessarily and has
>>> caused checkpoint failures.  I assumed there was built in Flink code to not
>>> get too far ahead for a single downstream kafka source.  Looking through
>>> the code, I don't think this exists.
>>>
>>> Is this a known issue with trying to use Flink to backfill?  Am I
>>> misunderstanding something?
>>>
>>> Here's an example flow chart for a cascading join job.  One of the right
>>> kafka data sources goes 10x-100x more records than the left data sources
>>> and causes state to grow.
>>> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png]
>>>
>>

Reply via email to