Approach #3 -- a custom operator -- is interesting. It hadn't occurred to
me. Too bad it's not an option for Flink SQL. This seems to be equivalent
to the MAX_WATERMARK approach, but less hacky.

I sometimes recommend the second approach, based on idleness. But I have
seen it cause some pretty weird behaviors, like the watermark going
backwards.

Consider this scenario:

   - the left side of a temporal join has produced some records with
   timestamps from the past
   - the WM from the left side is small
   - then the left side becomes idle
   - the right side of the join produces records with larger timestamps,
   and it emits a large watermark
   - the join emits results based on that large watermark
   - then the right side becomes idle
   - finally, the left side of the join wakes up and produces more events
   with smaller timestamps, causing the watermark to "go backwards"


Admittedly, this is a somewhat contrived example.

David


On Wed, Mar 18, 2026 at 10:58 PM Salva Alcántara <[email protected]>
wrote:

> There are situations where the overall (minimum) watermark for an operator
> with multiple inputs might stall because of a slow, almost static, control
> stream—or multiple ones.
>
> There are multiple options that I've seen used/recommended out there:
>
> 1. Define a watermark generator returning `MAX_WATERMARK` as explained
> here:
>
>
> https://stackoverflow.com/questions/69765403/apache-flink-watermark-does-not-progress-with-broadcast-stream
>
>
> 2. Alternatively/similarly, use `withIdleness`—with a very short period
>
> 3. Wrap the user defined function using Operator API. E.g.,
>
> ```java
> public class SingleWatermarkKeyedCoProcessFunction<K, IN1, IN2, OUT>
> extends KeyedCoProcessOperator<K, IN1, IN2, OUT> {
>   public SingleWatermarkKeyedCoProcessFunction(KeyedCoProcessFunction<K,
> IN1, IN2, OUT> flatMapper) {
>     super(flatMapper);
>   }
>
>   @Override
>   public void processWatermark1(Watermark mark) throws Exception {
>     super.processWatermark(mark);
>   }
>
>   @Override
>   public void processWatermark2(Watermark mark) { }
> }
> ```
>
> To me, this looks more like a property of the streams themselves, so I'd
> be partial to either 1 or 2—although I've personally used 3 in some
> projects.
>
> Maybe it would make sense to add a new generator to cover this use case?
> E.g., for option 1:
>
> ```java
> static <A> DocumentWatermarkStrategy<A> ignoreWatermarks() {
>     return (ctx) -> new IgnoreWatermarksGenerator<>();
> }
> ```
>
> where:
>
> ```java
> public class IgnoreWatermarksGenerator<T> implements WatermarkGenerator<T>
> {
>     @Override
>     public void onEvent(T event, long eventTimestamp, WatermarkOutput
> output) { /* Do nothing */ }
>
>     @Override
>     public void onPeriodicEmit(WatermarkOutput output) {
>         output.emitWatermark(Watermark.MAX_WATERMARK);
>     }
> }
> ```
>
> Any thoughts on this?
> Maybe the docs should cover this problem/common pitfall more explicitly?
>
> Regards,
>
> Salva
>

Reply via email to