There are situations where the overall (minimum) watermark for an operator with multiple inputs might stall because of a slow, almost static, control stream—or multiple ones.
There are multiple options that I've seen used/recommended out there: 1. Define a watermark generator returning `MAX_WATERMARK` as explained here: https://stackoverflow.com/questions/69765403/apache-flink-watermark-does-not-progress-with-broadcast-stream 2. Alternatively/similarly, use `withIdleness`—with a very short period 3. Wrap the user defined function using Operator API. E.g., ```java public class SingleWatermarkKeyedCoProcessFunction<K, IN1, IN2, OUT> extends KeyedCoProcessOperator<K, IN1, IN2, OUT> { public SingleWatermarkKeyedCoProcessFunction(KeyedCoProcessFunction<K, IN1, IN2, OUT> flatMapper) { super(flatMapper); } @Override public void processWatermark1(Watermark mark) throws Exception { super.processWatermark(mark); } @Override public void processWatermark2(Watermark mark) { } } ``` To me, this looks more like a property of the streams themselves, so I'd be partial to either 1 or 2—although I've personally used 3 in some projects. Maybe it would make sense to add a new generator to cover this use case? E.g., for option 1: ```java static <A> DocumentWatermarkStrategy<A> ignoreWatermarks() { return (ctx) -> new IgnoreWatermarksGenerator<>(); } ``` where: ```java public class IgnoreWatermarksGenerator<T> implements WatermarkGenerator<T> { @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { /* Do nothing */ } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(Watermark.MAX_WATERMARK); } } ``` Any thoughts on this? Maybe the docs should cover this problem/common pitfall more explicitly? Regards, Salva
