Hi Yun!

That advice was useful.  The state for that operator is very small (31kb).
Most of the checkpoint size is in a couple simple DataStream.intervalJoin
operators.  The time intervals are fairly short.

I'm going to try running the code with some small configuration changes.
One thing I did notice is that I set a positive value for the
relativeUpperBound.  I'm not sure if I found a bug in IntervalJoinOperator
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>.
The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for
clean up.  It has some logic around cleaning up the right side that
uses timerTimestamp
+ lowerBound
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>.
However, processElement doesn’t use the same logic when creating a timer (I
only see + lowerBound
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>).
Maybe I'm misreading the code.  It feels like a bug.


On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Dan,
>
> Regarding the original checkpoint size problem, could you also have a
> check
> which tasks' state are increasing from the checkpoint UI ? For example,
> the
> attached operator has a `alreadyOutputed` value state, which seems to keep
> increasing if there are always new keys ?
>
> Best,
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Dan Hill <quietgol...@gmail.com>
> *Send Date:*Tue Mar 9 00:59:24 2021
> *Recipients:*Yun Gao <yungao...@aliyun.com>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: Gradually increasing checkpoint size
>
>> Hi Yun!
>>
>> Thanks for the quick reply.
>>
>> One of the lowerBounds is large but the table being joined with is ~500
>> rows.  I also have my own operator that only outputs the first value.
>>
>> public class OnlyFirstUser<T extends GeneratedMessageV3> extends
>> RichFlatMapFunction<T, T> {
>>
>>
>>     private transient ValueState<Boolean> alreadyOutputted;
>>
>>
>>     @Override
>>
>>     public void flatMap(T value, Collector<T> out) throws Exception {
>>
>>         if (!alreadyOutputted.value()) {
>>
>>             alreadyOutputted.update(true);
>>
>>             out.collect(value);
>>
>>         }
>>
>>     }
>>
>>
>>     @Override
>>
>>     public void open(Configuration config) {
>>
>>         ValueStateDescriptor<Boolean> descriptor =
>>
>>                 new ValueStateDescriptor<>(
>>
>>                         "alreadyOutputted", // the state name
>>
>>                         TypeInformation.of(new TypeHint<Boolean>() {}),
>> // type information
>>
>>                         false); // default value of the state, if
>> nothing was set
>>
>>         alreadyOutputted = getRuntimeContext().getState(descriptor);
>>
>>     }
>>
>> }
>>
>> All of my inputs have this watermark strategy.  In the Flink UI, early in
>> the job run, I see "Low Watermarks" on each node and they increase.  After
>> some checkpoint failures, low watermarks stop appearing in the UI
>> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>
>> .
>>
>>
>> .assignTimestampsAndWatermarks(
>>
>>
>> WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>
>>
>>
>> Thanks Yun!
>>
>>
>> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yungao...@aliyun.com> wrote:
>>
>>> Hi Dan,
>>>
>>> Have you use a too large upperBound or lowerBound?
>>>
>>> If not, could you also check the watermark strategy ?
>>> The interval join operator depends on the event-time
>>> timer for cleanup, and the event-time timer would be
>>> triggered via watermark.
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> ------------------Original Mail ------------------
>>> *Sender:*Dan Hill <quietgol...@gmail.com>
>>> *Send Date:*Mon Mar 8 14:59:48 2021
>>> *Recipients:*user <user@flink.apache.org>
>>> *Subject:*Gradually increasing checkpoint size
>>>
>>>> Hi!
>>>>
>>>> I'm running a backfill Flink stream job over older data.  It has
>>>> multiple interval joins.  I noticed my checkpoint is regularly gaining in
>>>> size.  I'd expect my checkpoints to stabilize and not grow.
>>>>
>>>> Is there a setting to prune useless data from the checkpoint?  My top
>>>> guess is that my checkpoint has a bunch of useless state in it.
>>>>
>>>> - Dan
>>>>
>>>

Reply via email to