I figured it out. I have some records with the same key and I was doing an IntervalJoin. One of the IntervalJoin implementations that I found looks like it the runtime increases exponentially when there are duplicate keys. I introduced a de-duping step and it works a lot faster.
On Thu, Mar 11, 2021 at 5:30 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hey Dan, > > I think the logic should be correct. Mind that in the processElement we > are using *relative*Upper/LowerBound, which are inverted global bound: > > relativeUpperBound = upperBound for left and -lowerBound for right > > relativeLowerBound = lowerBound for left and -upperBound for right > > Therefore the cleaning logic in onTimer effectively uses the same logic. > If I understand it correctly, this trick was introduced to deduplicate the > method. > > There might be a bug somewhere, but I don't think it's where you pointed. > I'd suggest to first investigate the progress of watermarks. > > Best, > > Dawid > On 09/03/2021 08:36, Dan Hill wrote: > > Hi Yun! > > That advice was useful. The state for that operator is very small > (31kb). Most of the checkpoint size is in a couple simple > DataStream.intervalJoin operators. The time intervals are fairly short. > > I'm going to try running the code with some small configuration changes. > One thing I did notice is that I set a positive value for the > relativeUpperBound. I'm not sure if I found a bug in IntervalJoinOperator > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>. > The logic in IntervalJoinOperator.onEventTime needs an exact timestamp for > clean up. It has some logic around cleaning up the right side that uses > timerTimestamp > + lowerBound > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>. > However, processElement doesn’t use the same logic when creating a timer (I > only see + lowerBound > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>). > Maybe I'm misreading the code. It feels like a bug. > > > On Mon, Mar 8, 2021 at 10:29 PM Yun Gao <yungao...@aliyun.com> wrote: > >> Hi Dan, >> >> Regarding the original checkpoint size problem, could you also have a >> check >> which tasks' state are increasing from the checkpoint UI ? For example, >> the >> attached operator has a `alreadyOutputed` value state, which seems to keep >> increasing if there are always new keys ? >> >> Best, >> Yun >> >> >> ------------------Original Mail ------------------ >> *Sender:*Dan Hill <quietgol...@gmail.com> >> *Send Date:*Tue Mar 9 00:59:24 2021 >> *Recipients:*Yun Gao <yungao...@aliyun.com> >> *CC:*user <user@flink.apache.org> >> *Subject:*Re: Gradually increasing checkpoint size >> >>> Hi Yun! >>> >>> Thanks for the quick reply. >>> >>> One of the lowerBounds is large but the table being joined with is ~500 >>> rows. I also have my own operator that only outputs the first value. >>> >>> public class OnlyFirstUser<T extends GeneratedMessageV3> extends >>> RichFlatMapFunction<T, T> { >>> >>> >>> private transient ValueState<Boolean> alreadyOutputted; >>> >>> >>> @Override >>> >>> public void flatMap(T value, Collector<T> out) throws Exception { >>> >>> if (!alreadyOutputted.value()) { >>> >>> alreadyOutputted.update(true); >>> >>> out.collect(value); >>> >>> } >>> >>> } >>> >>> >>> @Override >>> >>> public void open(Configuration config) { >>> >>> ValueStateDescriptor<Boolean> descriptor = >>> >>> new ValueStateDescriptor<>( >>> >>> "alreadyOutputted", // the state name >>> >>> TypeInformation.of(new TypeHint<Boolean>() {}), >>> // type information >>> >>> false); // default value of the state, if >>> nothing was set >>> >>> alreadyOutputted = getRuntimeContext().getState(descriptor); >>> >>> } >>> >>> } >>> >>> All of my inputs have this watermark strategy. In the Flink UI, early >>> in the job run, I see "Low Watermarks" on each node and they increase. >>> After some checkpoint failures, low watermarks stop appearing in the UI >>> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing> >>> . >>> >>> >>> .assignTimestampsAndWatermarks( >>> >>> >>> WatermarkStrategy.<GeneratedMessageV3>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1))); >>> >>> >>> >>> Thanks Yun! >>> >>> >>> On Mon, Mar 8, 2021 at 7:27 AM Yun Gao <yungao...@aliyun.com> wrote: >>> >>>> Hi Dan, >>>> >>>> Have you use a too large upperBound or lowerBound? >>>> >>>> If not, could you also check the watermark strategy ? >>>> The interval join operator depends on the event-time >>>> timer for cleanup, and the event-time timer would be >>>> triggered via watermark. >>>> >>>> Best, >>>> Yun >>>> >>>> >>>> ------------------Original Mail ------------------ >>>> *Sender:*Dan Hill <quietgol...@gmail.com> >>>> *Send Date:*Mon Mar 8 14:59:48 2021 >>>> *Recipients:*user <user@flink.apache.org> >>>> *Subject:*Gradually increasing checkpoint size >>>> >>>>> Hi! >>>>> >>>>> I'm running a backfill Flink stream job over older data. It has >>>>> multiple interval joins. I noticed my checkpoint is regularly gaining in >>>>> size. I'd expect my checkpoints to stabilize and not grow. >>>>> >>>>> Is there a setting to prune useless data from the checkpoint? My top >>>>> guess is that my checkpoint has a bunch of useless state in it. >>>>> >>>>> - Dan >>>>> >>>>