There are situations where the overall (minimum) watermark for an operator
with multiple inputs might stall because of a slow, almost static, control
stream—or multiple ones.

There are multiple options that I've seen used/recommended out there:

1. Define a watermark generator returning `MAX_WATERMARK` as explained here:

https://stackoverflow.com/questions/69765403/apache-flink-watermark-does-not-progress-with-broadcast-stream


2. Alternatively/similarly, use `withIdleness`—with a very short period

3. Wrap the user defined function using Operator API. E.g.,

```java
public class SingleWatermarkKeyedCoProcessFunction<K, IN1, IN2, OUT>
extends KeyedCoProcessOperator<K, IN1, IN2, OUT> {
  public SingleWatermarkKeyedCoProcessFunction(KeyedCoProcessFunction<K,
IN1, IN2, OUT> flatMapper) {
    super(flatMapper);
  }

  @Override
  public void processWatermark1(Watermark mark) throws Exception {
    super.processWatermark(mark);
  }

  @Override
  public void processWatermark2(Watermark mark) { }
}
```

To me, this looks more like a property of the streams themselves, so I'd be
partial to either 1 or 2—although I've personally used 3 in some projects.

Maybe it would make sense to add a new generator to cover this use case?
E.g., for option 1:

```java
static <A> DocumentWatermarkStrategy<A> ignoreWatermarks() {
    return (ctx) -> new IgnoreWatermarksGenerator<>();
}
```

where:

```java
public class IgnoreWatermarksGenerator<T> implements WatermarkGenerator<T> {
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput
output) { /* Do nothing */ }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(Watermark.MAX_WATERMARK);
    }
}
```

Any thoughts on this?
Maybe the docs should cover this problem/common pitfall more explicitly?

Regards,

Salva

Reply via email to