Re: Kafka data sources, multiple interval joins and backfilling

2021-07-21 Thread David Morávek
Hi Dan,

unfortunately Flink currently provides no source level synchronization,
except for Kinesis [1], so it's easy to run into large states, when
processing historical data.

There is an on-going effort, to provide a generic watermark-based alignment
of FLIP-27 sources [2], that will most likely help to mitigate the issue.

[1] https://issues.apache.org/jira/browse/FLINK-10886
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources

Best,
D.

On Wed, Jul 21, 2021 at 8:43 AM JING ZHANG  wrote:

> Hi Dan,
> > I've tried playing around with parallelism and resources.  It does help.
> Glad to hear your problem is solved .
>
> > Does Flink have special logic with the built in interval join code that
> impacts how kafka data sources are read?
> No. If you said the way I mentioned in the last email, I mean to add
> control the consumption order of each source in a custom Kafka connector.
>
> Dan Hill  于2021年7月21日周三 下午2:10写道:
>
>> Thanks JING and Caizhi!
>>
>> Yea, I've tried playing around with parallelism and resources.  It does
>> help.
>>
>> We have our own join operator that acts like an interval join (with fuzzy
>> matching).  We wrote our own KeyedCoProcessFunction and modeled it closely
>> after the internal interval join code.  Does Flink have special logic with
>> the built in interval join code that impacts how kafka data sources are
>> read?
>>
>>
>>
>> On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG  wrote:
>>
>>> Hi Dan,
>>> You are right. In interval join, if one of input stream is far ahead of
>>> the other one, its data would be buffered into state until watermark of the
>>> other input stream catches up.
>>> This is a known issue of interval join. And this situation is even worse
>>> in your example because of the following reasons:
>>> 1. Running as backfills
>>> 2. There are cascading interval joins in the topology
>>>
>>> There is a hack way to walk around, hope it helps. Control the consume
>>> data of each source based on the following sequence:
>>> 1. Consume the larger data source in the same join after the smaller
>>> source consumption finished.
>>> 2. Consume the source in the following join after the previous join
>>> finished
>>>
>>> BTW: Please double check you use interval join instead of regular join,
>>> this would happen if compare two field with regular timestamp type in join
>>> condition instead of time attribute.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Dan Hill  于2021年7月21日周三 上午4:25写道:
>>>
 Hi.  My team's flink job has cascading interval joins.  The problem I'm
 outlining below is fine when streaming normally.  It's an issue with
 backfills.  We've been running into a bunch of backfills to evaluate the
 job over older data.

 When running as backfills, I've noticed that sometimes one of
 downstream kafka inputs will read in a lot of data from it's kafka source
 before the upstream kafka sources makes much progress.  The downstream
 kafka source gets far ahead of the interval join window constrained by the
 upstream sources.  This appears to cause the state to grow unnecessarily
 and has caused checkpoint failures.  I assumed there was built in Flink
 code to not get too far ahead for a single downstream kafka source.
 Looking through the code, I don't think this exists.

 Is this a known issue with trying to use Flink to backfill?  Am I
 misunderstanding something?

 Here's an example flow chart for a cascading join job.  One of the
 right kafka data sources goes 10x-100x more records than the left data
 sources and causes state to grow.
 [image: Screen Shot 2021-07-20 at 1.02.27 PM.png]

>>>


Re: Kafka data sources, multiple interval joins and backfilling

2021-07-21 Thread JING ZHANG
Hi Dan,
> I've tried playing around with parallelism and resources.  It does help.
Glad to hear your problem is solved .

> Does Flink have special logic with the built in interval join code that
impacts how kafka data sources are read?
No. If you said the way I mentioned in the last email, I mean to add
control the consumption order of each source in a custom Kafka connector.

Dan Hill  于2021年7月21日周三 下午2:10写道:

> Thanks JING and Caizhi!
>
> Yea, I've tried playing around with parallelism and resources.  It does
> help.
>
> We have our own join operator that acts like an interval join (with fuzzy
> matching).  We wrote our own KeyedCoProcessFunction and modeled it closely
> after the internal interval join code.  Does Flink have special logic with
> the built in interval join code that impacts how kafka data sources are
> read?
>
>
>
> On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG  wrote:
>
>> Hi Dan,
>> You are right. In interval join, if one of input stream is far ahead of
>> the other one, its data would be buffered into state until watermark of the
>> other input stream catches up.
>> This is a known issue of interval join. And this situation is even worse
>> in your example because of the following reasons:
>> 1. Running as backfills
>> 2. There are cascading interval joins in the topology
>>
>> There is a hack way to walk around, hope it helps. Control the consume
>> data of each source based on the following sequence:
>> 1. Consume the larger data source in the same join after the smaller
>> source consumption finished.
>> 2. Consume the source in the following join after the previous join
>> finished
>>
>> BTW: Please double check you use interval join instead of regular join,
>> this would happen if compare two field with regular timestamp type in join
>> condition instead of time attribute.
>>
>> Best,
>> JING ZHANG
>>
>> Dan Hill  于2021年7月21日周三 上午4:25写道:
>>
>>> Hi.  My team's flink job has cascading interval joins.  The problem I'm
>>> outlining below is fine when streaming normally.  It's an issue with
>>> backfills.  We've been running into a bunch of backfills to evaluate the
>>> job over older data.
>>>
>>> When running as backfills, I've noticed that sometimes one of downstream
>>> kafka inputs will read in a lot of data from it's kafka source before the
>>> upstream kafka sources makes much progress.  The downstream kafka source
>>> gets far ahead of the interval join window constrained by the upstream
>>> sources.  This appears to cause the state to grow unnecessarily and has
>>> caused checkpoint failures.  I assumed there was built in Flink code to not
>>> get too far ahead for a single downstream kafka source.  Looking through
>>> the code, I don't think this exists.
>>>
>>> Is this a known issue with trying to use Flink to backfill?  Am I
>>> misunderstanding something?
>>>
>>> Here's an example flow chart for a cascading join job.  One of the right
>>> kafka data sources goes 10x-100x more records than the left data sources
>>> and causes state to grow.
>>> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png]
>>>
>>


Re: Kafka data sources, multiple interval joins and backfilling

2021-07-21 Thread Dan Hill
Thanks JING and Caizhi!

Yea, I've tried playing around with parallelism and resources.  It does
help.

We have our own join operator that acts like an interval join (with fuzzy
matching).  We wrote our own KeyedCoProcessFunction and modeled it closely
after the internal interval join code.  Does Flink have special logic with
the built in interval join code that impacts how kafka data sources are
read?



On Tue, Jul 20, 2021 at 8:31 PM JING ZHANG  wrote:

> Hi Dan,
> You are right. In interval join, if one of input stream is far ahead of
> the other one, its data would be buffered into state until watermark of the
> other input stream catches up.
> This is a known issue of interval join. And this situation is even worse
> in your example because of the following reasons:
> 1. Running as backfills
> 2. There are cascading interval joins in the topology
>
> There is a hack way to walk around, hope it helps. Control the consume
> data of each source based on the following sequence:
> 1. Consume the larger data source in the same join after the smaller
> source consumption finished.
> 2. Consume the source in the following join after the previous join
> finished
>
> BTW: Please double check you use interval join instead of regular join,
> this would happen if compare two field with regular timestamp type in join
> condition instead of time attribute.
>
> Best,
> JING ZHANG
>
> Dan Hill  于2021年7月21日周三 上午4:25写道:
>
>> Hi.  My team's flink job has cascading interval joins.  The problem I'm
>> outlining below is fine when streaming normally.  It's an issue with
>> backfills.  We've been running into a bunch of backfills to evaluate the
>> job over older data.
>>
>> When running as backfills, I've noticed that sometimes one of downstream
>> kafka inputs will read in a lot of data from it's kafka source before the
>> upstream kafka sources makes much progress.  The downstream kafka source
>> gets far ahead of the interval join window constrained by the upstream
>> sources.  This appears to cause the state to grow unnecessarily and has
>> caused checkpoint failures.  I assumed there was built in Flink code to not
>> get too far ahead for a single downstream kafka source.  Looking through
>> the code, I don't think this exists.
>>
>> Is this a known issue with trying to use Flink to backfill?  Am I
>> misunderstanding something?
>>
>> Here's an example flow chart for a cascading join job.  One of the right
>> kafka data sources goes 10x-100x more records than the left data sources
>> and causes state to grow.
>> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png]
>>
>


Re: Kafka data sources, multiple interval joins and backfilling

2021-07-20 Thread Caizhi Weng
Hi!

Streaming joins will not throw away records in the state unless it exceeds
the TTL. Have you tried increasing the parallelism of join operators (and
maybe decrease the parallelism of the large Kafka source)?

Dan Hill  于2021年7月21日周三 上午4:19写道:

> Hi.  My team's flink job has cascading interval joins.  The problem I'm
> outlining below is fine when streaming normally.  It's an issue with
> backfills.  We've been running into a bunch of backfills to evaluate the
> job over older data.
>
> When running as backfills, I've noticed that sometimes one of downstream
> kafka inputs will read in a lot of data from it's kafka source before the
> upstream kafka sources makes much progress.  The downstream kafka source
> gets far ahead of the interval join window constrained by the upstream
> sources.  This appears to cause the state to grow unnecessarily and has
> caused checkpoint failures.  I assumed there was built in Flink code to not
> get too far ahead for a single downstream kafka source.  Looking through
> the code, I don't think this exists.
>
> Is this a known issue with trying to use Flink to backfill?  Am I
> misunderstanding something?
>
> Here's an example flow chart for a cascading join job.  One of the right
> kafka data sources goes 10x-100x more records than the left data sources
> and causes state to grow.
> [image: Screen Shot 2021-07-20 at 1.02.27 PM.png]
>


Kafka data sources, multiple interval joins and backfilling

2021-07-20 Thread Dan Hill
Hi.  My team's flink job has cascading interval joins.  The problem I'm
outlining below is fine when streaming normally.  It's an issue with
backfills.  We've been running into a bunch of backfills to evaluate the
job over older data.

When running as backfills, I've noticed that sometimes one of downstream
kafka inputs will read in a lot of data from it's kafka source before the
upstream kafka sources makes much progress.  The downstream kafka source
gets far ahead of the interval join window constrained by the upstream
sources.  This appears to cause the state to grow unnecessarily and has
caused checkpoint failures.  I assumed there was built in Flink code to not
get too far ahead for a single downstream kafka source.  Looking through
the code, I don't think this exists.

Is this a known issue with trying to use Flink to backfill?  Am I
misunderstanding something?

Here's an example flow chart for a cascading join job.  One of the right
kafka data sources goes 10x-100x more records than the left data sources
and causes state to grow.
[image: Screen Shot 2021-07-20 at 1.02.27 PM.png]