Hi Yun! That advice was useful. The state for that operator is very small (31kb). Most of the checkpoint size is in a couple simple DataStream.intervalJoin operators. The time intervals are fairly short.
I'm going to try running the code with some small configuration changes. One thing I did notice is that I set a positive value for the relativeUpperBound. I'm not sure if I found a bug in IntervalJoinOperator <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>. The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for clean up. It has some logic around cleaning up the right side that uses timerTimestamp + lowerBound <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>. However, processElement doesn’t use the same logic when creating a timer (I only see + lowerBound <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>). Maybe I'm misreading the code. It feels like a bug. On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <yungao...@aliyun.com> wrote: > Hi Dan, > > Regarding the original checkpoint size problem, could you also have a > check > which tasks' state are increasing from the checkpoint UI ? For example, > the > attached operator has a `alreadyOutputed` value state, which seems to keep > increasing if there are always new keys ? > > Best, > Yun > > > ------------------Original Mail ------------------ > *Sender:*Dan Hill <quietgol...@gmail.com> > *Send Date:*Tue Mar 9 00:59:24 2021 > *Recipients:*Yun Gao <yungao...@aliyun.com> > *CC:*user <user@flink.apache.org> > *Subject:*Re: Gradually increasing checkpoint size > >> Hi Yun! >> >> Thanks for the quick reply. >> >> One of the lowerBounds is large but the table being joined with is ~500 >> rows. I also have my own operator that only outputs the first value. >> >> public class OnlyFirstUser<T extends GeneratedMessageV3> extends >> RichFlatMapFunction<T, T> { >> >> >> private transient ValueState<Boolean> alreadyOutputted; >> >> >> @Override >> >> public void flatMap(T value, Collector<T> out) throws Exception { >> >> if (!alreadyOutputted.value()) { >> >> alreadyOutputted.update(true); >> >> out.collect(value); >> >> } >> >> } >> >> >> @Override >> >> public void open(Configuration config) { >> >> ValueStateDescriptor<Boolean> descriptor = >> >> new ValueStateDescriptor<>( >> >> "alreadyOutputted", // the state name >> >> TypeInformation.of(new TypeHint<Boolean>() {}), >> // type information >> >> false); // default value of the state, if >> nothing was set >> >> alreadyOutputted = getRuntimeContext().getState(descriptor); >> >> } >> >> } >> >> All of my inputs have this watermark strategy. In the Flink UI, early in >> the job run, I see "Low Watermarks" on each node and they increase. After >> some checkpoint failures, low watermarks stop appearing in the UI >> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing> >> . >> >> >> .assignTimestampsAndWatermarks( >> >> >> WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); >> >> >> >> Thanks Yun! >> >> >> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yungao...@aliyun.com> wrote: >> >>> Hi Dan, >>> >>> Have you use a too large upperBound or lowerBound? >>> >>> If not, could you also check the watermark strategy ? >>> The interval join operator depends on the event-time >>> timer for cleanup, and the event-time timer would be >>> triggered via watermark. >>> >>> Best, >>> Yun >>> >>> >>> ------------------Original Mail ------------------ >>> *Sender:*Dan Hill <quietgol...@gmail.com> >>> *Send Date:*Mon Mar 8 14:59:48 2021 >>> *Recipients:*user <user@flink.apache.org> >>> *Subject:*Gradually increasing checkpoint size >>> >>>> Hi! >>>> >>>> I'm running a backfill Flink stream job over older data. It has >>>> multiple interval joins. I noticed my checkpoint is regularly gaining in >>>> size. I'd expect my checkpoints to stabilize and not grow. >>>> >>>> Is there a setting to prune useless data from the checkpoint? My top >>>> guess is that my checkpoint has a bunch of useless state in it. >>>> >>>> - Dan >>>> >>>