Re: Gradually increasing checkpoint size
I figured it out. I have some records with the same key and I was doing an IntervalJoin. One of the IntervalJoin implementations that I found looks like it the runtime increases exponentially when there are duplicate keys. I introduced a de-duping step and it works a lot faster. On Thu, Mar 11, 2021 at 5:30 AM Dawid Wysakowicz wrote: > Hey Dan, > > I think the logic should be correct. Mind that in the processElement we > are using *relative*Upper/LowerBound, which are inverted global bound: > > relativeUpperBound = upperBound for left and -lowerBound for right > > relativeLowerBound = lowerBound for left and -upperBound for right > > Therefore the cleaning logic in onTimer effectively uses the same logic. > If I understand it correctly, this trick was introduced to deduplicate the > method. > > There might be a bug somewhere, but I don't think it's where you pointed. > I'd suggest to first investigate the progress of watermarks. > > Best, > > Dawid > On 09/03/2021 08:36, Dan Hill wrote: > > Hi Yun! > > That advice was useful. The state for that operator is very small > (31kb). Most of the checkpoint size is in a couple simple > DataStream.intervalJoin operators. The time intervals are fairly short. > > I'm going to try running the code with some small configuration changes. > One thing I did notice is that I set a positive value for the > relativeUpperBound. I'm not sure if I found a bug in IntervalJoinOperator > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>. > The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for > clean up. It has some logic around cleaning up the right side that uses > timerTimestamp > + lowerBound > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>. > However, processElement doesn’t use the same logic when creating a timer (I > only see + lowerBound > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>). > Maybe I'm misreading the code. It feels like a bug. > > > On Mon, Mar 8, 2021 at 10:29 PM Yun Gao wrote: > >> Hi Dan, >> >> Regarding the original checkpoint size problem, could you also have a >> check >> which tasks' state are increasing from the checkpoint UI ? For example, >> the >> attached operator has a `alreadyOutputed` value state, which seems to keep >> increasing if there are always new keys ? >> >> Best, >> Yun >> >> >> --Original Mail -- >> *Sender:*Dan Hill >> *Send Date:*Tue Mar 9 00:59:24 2021 >> *Recipients:*Yun Gao >> *CC:*user >> *Subject:*Re: Gradually increasing checkpoint size >> >>> Hi Yun! >>> >>> Thanks for the quick reply. >>> >>> One of the lowerBounds is large but the table being joined with is ~500 >>> rows. I also have my own operator that only outputs the first value. >>> >>> public class OnlyFirstUser extends >>> RichFlatMapFunction { >>> >>> >>> private transient ValueState alreadyOutputted; >>> >>> >>> @Override >>> >>> public void flatMap(T value, Collector out) throws Exception { >>> >>> if (!alreadyOutputted.value()) { >>> >>> alreadyOutputted.update(true); >>> >>> out.collect(value); >>> >>> } >>> >>> } >>> >>> >>> @Override >>> >>> public void open(Configuration config) { >>> >>> ValueStateDescriptor descriptor = >>> >>> new ValueStateDescriptor<>( >>> >>> "alreadyOutputted", // the state name >>> >>> TypeInformation.of(new TypeHint() {}), >>> // type information >>> >>> false); // default value of the state, if >>> nothing was set >>> >>> alreadyOutputted = getRuntimeContext().getState(descriptor); >>> >>> } >>> >>> } >>> >>> All of my inputs have this watermark strategy. In the Flink UI, early >>> in the job run, I see "Low Watermarks" on each node and they increase. >>> After some checkpoint failures, low watermarks stop app
Re: Gradually increasing checkpoint size
Hey Dan, I think the logic should be correct. Mind that in the processElement we are using *relative*Upper/LowerBound, which are inverted global bound: relativeUpperBound = upperBound for left and -lowerBound for right relativeLowerBound = lowerBound for left and -upperBound for right Therefore the cleaning logic in onTimer effectively uses the same logic. If I understand it correctly, this trick was introduced to deduplicate the method. There might be a bug somewhere, but I don't think it's where you pointed. I'd suggest to first investigate the progress of watermarks. Best, Dawid On 09/03/2021 08:36, Dan Hill wrote: > Hi Yun! > > That advice was useful. The state for that operator is very small > (31kb). Most of the checkpoint size is in a couple simple > DataStream.intervalJoin operators. The time intervals are fairly short. > > I'm going to try running the code with some small configuration > changes. One thing I did notice is that I set a positive value for > the relativeUpperBound. I'm not sure if I found a bug in > IntervalJoinOperator > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>. > > The logic in IntervalJoinOperator.onEventTime needs an exact timestamp > for clean up. It has some logic around cleaning up the right side > that uses timerTimestamp + lowerBound > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>. > > However, processElement doesn’t use the same logic when creating a > timer (I only see + lowerBound > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>). > > Maybe I'm misreading the code. It feels like a bug. > > > On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <mailto:yungao...@aliyun.com>> wrote: > > Hi Dan, > > Regarding the original checkpoint size problem, could you also > have a check > which tasks' state are increasing from the checkpoint UI ? For > example, the > attached operator has a `alreadyOutputed` value state, which seems > to keep > increasing if there are always new keys ? > > Best, > Yun > > > --Original Mail -- > *Sender:*Dan Hill <mailto:quietgol...@gmail.com>> > *Send Date:*Tue Mar 9 00:59:24 2021 > *Recipients:*Yun Gao <mailto:yungao...@aliyun.com>> > *CC:*user mailto:user@flink.apache.org>> > *Subject:*Re: Gradually increasing checkpoint size > > Hi Yun! > > Thanks for the quick reply. > > One of the lowerBounds is large but the table being joined > with is ~500 rows. I also have my own operator that only > outputs the first value. > > public class OnlyFirstUser > extends RichFlatMapFunction { > > > private transient ValueState alreadyOutputted; > > > @Override > > public void flatMap(T value, Collector out) throws > Exception { > > if (!alreadyOutputted.value()) { > > alreadyOutputted.update(true); > > out.collect(value); > > } > > } > > > @Override > > public void open(Configuration config) { > > ValueStateDescriptor descriptor = > > new ValueStateDescriptor<>( > > "alreadyOutputted", // the state name > > TypeInformation.of(new > TypeHint() {}), // type information > > false); // default value of the > state, if nothing was set > > alreadyOutputted = > getRuntimeContext().getState(descriptor); > > } > > } > > > All of my inputs have this watermark strategy. In the > Flink UI, early in the job run, I see "Low Watermarks" on > each node and they increase. After some checkpoint > failures, low watermarks stop appearing in the UI > > <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>. > > > .assignTimestampsAndWatermarks( > > >
Re: Re: Gradually increasing checkpoint size
Hi Yun! That advice was useful. The state for that operator is very small (31kb). Most of the checkpoint size is in a couple simple DataStream.intervalJoin operators. The time intervals are fairly short. I'm going to try running the code with some small configuration changes. One thing I did notice is that I set a positive value for the relativeUpperBound. I'm not sure if I found a bug in IntervalJoinOperator <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>. The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for clean up. It has some logic around cleaning up the right side that uses timerTimestamp + lowerBound <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>. However, processElement doesn’t use the same logic when creating a timer (I only see + lowerBound <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>). Maybe I'm misreading the code. It feels like a bug. On Mon, Mar 8, 2021 at 10:29 PM Yun Gao wrote: > Hi Dan, > > Regarding the original checkpoint size problem, could you also have a > check > which tasks' state are increasing from the checkpoint UI ? For example, > the > attached operator has a `alreadyOutputed` value state, which seems to keep > increasing if there are always new keys ? > > Best, > Yun > > > --Original Mail -- > *Sender:*Dan Hill > *Send Date:*Tue Mar 9 00:59:24 2021 > *Recipients:*Yun Gao > *CC:*user > *Subject:*Re: Gradually increasing checkpoint size > >> Hi Yun! >> >> Thanks for the quick reply. >> >> One of the lowerBounds is large but the table being joined with is ~500 >> rows. I also have my own operator that only outputs the first value. >> >> public class OnlyFirstUser extends >> RichFlatMapFunction { >> >> >> private transient ValueState alreadyOutputted; >> >> >> @Override >> >> public void flatMap(T value, Collector out) throws Exception { >> >> if (!alreadyOutputted.value()) { >> >> alreadyOutputted.update(true); >> >> out.collect(value); >> >> } >> >> } >> >> >> @Override >> >> public void open(Configuration config) { >> >> ValueStateDescriptor descriptor = >> >> new ValueStateDescriptor<>( >> >> "alreadyOutputted", // the state name >> >> TypeInformation.of(new TypeHint() {}), >> // type information >> >> false); // default value of the state, if >> nothing was set >> >> alreadyOutputted = getRuntimeContext().getState(descriptor); >> >> } >> >> } >> >> All of my inputs have this watermark strategy. In the Flink UI, early in >> the job run, I see "Low Watermarks" on each node and they increase. After >> some checkpoint failures, low watermarks stop appearing in the UI >> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing> >> . >> >> >> .assignTimestampsAndWatermarks( >> >> >> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); >> >> >> >> Thanks Yun! >> >> >> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao wrote: >> >>> Hi Dan, >>> >>> Have you use a too large upperBound or lowerBound? >>> >>> If not, could you also check the watermark strategy ? >>> The interval join operator depends on the event-time >>> timer for cleanup, and the event-time timer would be >>> triggered via watermark. >>> >>> Best, >>> Yun >>> >>> >>> --Original Mail -- >>> *Sender:*Dan Hill >>> *Send Date:*Mon Mar 8 14:59:48 2021 >>> *Recipients:*user >>> *Subject:*Gradually increasing checkpoint size >>> >>>> Hi! >>>> >>>> I'm running a backfill Flink stream job over older data. It has >>>> multiple interval joins. I noticed my checkpoint is regularly gaining in >>>> size. I'd expect my checkpoints to stabilize and not grow. >>>> >>>> Is there a setting to prune useless data from the checkpoint? My top >>>> guess is that my checkpoint has a bunch of useless state in it. >>>> >>>> - Dan >>>> >>>
Re: Re: Gradually increasing checkpoint size
Hi Dan, Regarding the original checkpoint size problem, could you also have a check which tasks' state are increasing from the checkpoint UI ? For example, the attached operator has a `alreadyOutputed` value state, which seems to keep increasing if there are always new keys ? Best, Yun --Original Mail -- Sender:Dan Hill Send Date:Tue Mar 9 00:59:24 2021 Recipients:Yun Gao CC:user Subject:Re: Gradually increasing checkpoint size Hi Yun! Thanks for the quick reply. One of the lowerBounds is large but the table being joined with is ~500 rows. I also have my own operator that only outputs the first value. public class OnlyFirstUser extends RichFlatMapFunction { private transient ValueState alreadyOutputted; @Override public void flatMap(T value, Collector out) throws Exception { if (!alreadyOutputted.value()) { alreadyOutputted.update(true); out.collect(value); } } @Override public void open(Configuration config) { ValueStateDescriptor descriptor = new ValueStateDescriptor<>( "alreadyOutputted", // the state name TypeInformation.of(new TypeHint() {}), // type information false); // default value of the state, if nothing was set alreadyOutputted = getRuntimeContext().getState(descriptor); } } All of my inputs have this watermark strategy. In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase. After some checkpoint failures, low watermarks stop appearing in the UI. .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); Thanks Yun! On Mon, Mar 8, 2021 at 7:27 AM Yun Gao wrote: Hi Dan, Have you use a too large upperBound or lowerBound? If not, could you also check the watermark strategy ? The interval join operator depends on the event-time timer for cleanup, and the event-time timer would be triggered via watermark. Best, Yun --Original Mail -- Sender:Dan Hill Send Date:Mon Mar 8 14:59:48 2021 Recipients:user Subject:Gradually increasing checkpoint size Hi! I'm running a backfill Flink stream job over older data. It has multiple interval joins. I noticed my checkpoint is regularly gaining in size. I'd expect my checkpoints to stabilize and not grow. Is there a setting to prune useless data from the checkpoint? My top guess is that my checkpoint has a bunch of useless state in it. - Dan
Re: Gradually increasing checkpoint size
Hi Yun! Thanks for the quick reply. One of the lowerBounds is large but the table being joined with is ~500 rows. I also have my own operator that only outputs the first value. public class OnlyFirstUser extends RichFlatMapFunction { private transient ValueState alreadyOutputted; @Override public void flatMap(T value, Collector out) throws Exception { if (!alreadyOutputted.value()) { alreadyOutputted.update(true); out.collect(value); } } @Override public void open(Configuration config) { ValueStateDescriptor descriptor = new ValueStateDescriptor<>( "alreadyOutputted", // the state name TypeInformation.of(new TypeHint() {}), // type information false); // default value of the state, if nothing was set alreadyOutputted = getRuntimeContext().getState(descriptor); } } All of my inputs have this watermark strategy. In the Flink UI, early in the job run, I see "Low Watermarks" on each node and they increase. After some checkpoint failures, low watermarks stop appearing in the UI <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing> . .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); Thanks Yun! On Mon, Mar 8, 2021 at 7:27 AM Yun Gao wrote: > Hi Dan, > > Have you use a too large upperBound or lowerBound? > > If not, could you also check the watermark strategy ? > The interval join operator depends on the event-time > timer for cleanup, and the event-time timer would be > triggered via watermark. > > Best, > Yun > > > --Original Mail -- > *Sender:*Dan Hill > *Send Date:*Mon Mar 8 14:59:48 2021 > *Recipients:*user > *Subject:*Gradually increasing checkpoint size > >> Hi! >> >> I'm running a backfill Flink stream job over older data. It has multiple >> interval joins. I noticed my checkpoint is regularly gaining in size. I'd >> expect my checkpoints to stabilize and not grow. >> >> Is there a setting to prune useless data from the checkpoint? My top >> guess is that my checkpoint has a bunch of useless state in it. >> >> - Dan >> >
Re: Gradually increasing checkpoint size
Hi Dan, Have you use a too large upperBound or lowerBound? If not, could you also check the watermark strategy ? The interval join operator depends on the event-time timer for cleanup, and the event-time timer would be triggered via watermark. Best, Yun --Original Mail -- Sender:Dan Hill Send Date:Mon Mar 8 14:59:48 2021 Recipients:user Subject:Gradually increasing checkpoint size Hi! I'm running a backfill Flink stream job over older data. It has multiple interval joins. I noticed my checkpoint is regularly gaining in size. I'd expect my checkpoints to stabilize and not grow. Is there a setting to prune useless data from the checkpoint? My top guess is that my checkpoint has a bunch of useless state in it. - Dan
Gradually increasing checkpoint size
Hi! I'm running a backfill Flink stream job over older data. It has multiple interval joins. I noticed my checkpoint is regularly gaining in size. I'd expect my checkpoints to stabilize and not grow. Is there a setting to prune useless data from the checkpoint? My top guess is that my checkpoint has a bunch of useless state in it. - Dan