I figured it out.  I have some records with the same key and I was doing an
IntervalJoin.  One of the IntervalJoin implementations that I found looks
like it the runtime increases exponentially when there are duplicate keys.
I introduced a de-duping step and it works a lot faster.

On Thu, Mar 11, 2021 at 5:30 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hey Dan,
>
> I think the logic should be correct. Mind that in the processElement we
> are using *relative*Upper/LowerBound, which are inverted global bound:
>
> relativeUpperBound = upperBound for left and -lowerBound for right
>
> relativeLowerBound = lowerBound for left and -upperBound for right
>
> Therefore the cleaning logic in onTimer effectively uses the same logic.
> If I understand it correctly, this trick was introduced to deduplicate the
> method.
>
> There might be a bug somewhere, but I don't think it's where you pointed.
> I'd suggest to first investigate the progress of watermarks.
>
> Best,
>
> Dawid
> On 09/03/2021 08:36, Dan Hill wrote:
>
> Hi Yun!
>
> That advice was useful.  The state for that operator is very small
> (31kb).  Most of the checkpoint size is in a couple simple
> DataStream.intervalJoin operators.  The time intervals are fairly short.
>
> I'm going to try running the code with some small configuration changes.
> One thing I did notice is that I set a positive value for the
> relativeUpperBound.  I'm not sure if I found a bug in IntervalJoinOperator
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>.
> The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for
> clean up.  It has some logic around cleaning up the right side that uses 
> timerTimestamp
> + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>.
> However, processElement doesn’t use the same logic when creating a timer (I
> only see + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>).
> Maybe I'm misreading the code.  It feels like a bug.
>
>
> On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <yungao...@aliyun.com> wrote:
>
>> Hi Dan,
>>
>> Regarding the original checkpoint size problem, could you also have a
>> check
>> which tasks' state are increasing from the checkpoint UI ? For example,
>> the
>> attached operator has a `alreadyOutputed` value state, which seems to keep
>> increasing if there are always new keys ?
>>
>> Best,
>> Yun
>>
>>
>> ------------------Original Mail ------------------
>> *Sender:*Dan Hill <quietgol...@gmail.com>
>> *Send Date:*Tue Mar 9 00:59:24 2021
>> *Recipients:*Yun Gao <yungao...@aliyun.com>
>> *CC:*user <user@flink.apache.org>
>> *Subject:*Re: Gradually increasing checkpoint size
>>
>>> Hi Yun!
>>>
>>> Thanks for the quick reply.
>>>
>>> One of the lowerBounds is large but the table being joined with is ~500
>>> rows.  I also have my own operator that only outputs the first value.
>>>
>>> public class OnlyFirstUser<T extends GeneratedMessageV3> extends
>>> RichFlatMapFunction<T, T> {
>>>
>>>
>>>     private transient ValueState<Boolean> alreadyOutputted;
>>>
>>>
>>>     @Override
>>>
>>>     public void flatMap(T value, Collector<T> out) throws Exception {
>>>
>>>         if (!alreadyOutputted.value()) {
>>>
>>>             alreadyOutputted.update(true);
>>>
>>>             out.collect(value);
>>>
>>>         }
>>>
>>>     }
>>>
>>>
>>>     @Override
>>>
>>>     public void open(Configuration config) {
>>>
>>>         ValueStateDescriptor<Boolean> descriptor =
>>>
>>>                 new ValueStateDescriptor<>(
>>>
>>>                         "alreadyOutputted", // the state name
>>>
>>>                         TypeInformation.of(new TypeHint<Boolean>() {}),
>>> // type information
>>>
>>>                         false); // default value of the state, if
>>> nothing was set
>>>
>>>         alreadyOutputted = getRuntimeContext().getState(descriptor);
>>>
>>>     }
>>>
>>> }
>>>
>>> All of my inputs have this watermark strategy.  In the Flink UI, early
>>> in the job run, I see "Low Watermarks" on each node and they increase.
>>> After some checkpoint failures, low watermarks stop appearing in the UI
>>> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>
>>> .
>>>
>>>
>>> .assignTimestampsAndWatermarks(
>>>
>>>
>>> WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>>
>>>
>>>
>>> Thanks Yun!
>>>
>>>
>>> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yungao...@aliyun.com> wrote:
>>>
>>>> Hi Dan,
>>>>
>>>> Have you use a too large upperBound or lowerBound?
>>>>
>>>> If not, could you also check the watermark strategy ?
>>>> The interval join operator depends on the event-time
>>>> timer for cleanup, and the event-time timer would be
>>>> triggered via watermark.
>>>>
>>>> Best,
>>>> Yun
>>>>
>>>>
>>>> ------------------Original Mail ------------------
>>>> *Sender:*Dan Hill <quietgol...@gmail.com>
>>>> *Send Date:*Mon Mar 8 14:59:48 2021
>>>> *Recipients:*user <user@flink.apache.org>
>>>> *Subject:*Gradually increasing checkpoint size
>>>>
>>>>> Hi!
>>>>>
>>>>> I'm running a backfill Flink stream job over older data.  It has
>>>>> multiple interval joins.  I noticed my checkpoint is regularly gaining in
>>>>> size.  I'd expect my checkpoints to stabilize and not grow.
>>>>>
>>>>> Is there a setting to prune useless data from the checkpoint?  My top
>>>>> guess is that my checkpoint has a bunch of useless state in it.
>>>>>
>>>>> - Dan
>>>>>
>>>>

Reply via email to