StephanEwen commented on a change in pull request #12412: URL: https://github.com/apache/flink/pull/12412#discussion_r433877916
########## File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java ########## @@ -49,4 +53,158 @@ */ @Override WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); + + /** + * Creates a watermark strategy for situations with monotonously ascending timestamps. + * + * <p>The watermarks are generated periodically and tightly follow the latest + * timestamp in the data. The delay introduced by this strategy is mainly the periodic interval + * in which the watermarks are generated. + * + * @see AscendingTimestampsWatermarks + */ + static <T> WatermarkStrategy<T> forMonotonousTimestamps() { + return (ctx) -> new AscendingTimestampsWatermarks<>(); + } + + /** + * Creates a watermark strategy for situations where records are out of order, but you + * can place an upper bound on how far the events are out of order. An out-of-order bound B + * means that once the an event with timestamp T was encountered, no events older than {@code T + * - B} will follow any more. + * + * <p>The watermarks are generated periodically. The delay introduced by this watermark + * strategy + * is the periodic interval length, plus the out of orderness bound. + * + * @see BoundedOutOfOrdernessWatermarks + */ + static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) { + return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness); + } + + /** + * Creates a watermark strategy based on an existing {@link + * WatermarkGeneratorSupplier}. + */ + static <T> WatermarkStrategy<T> forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) { + return generatorSupplier::createWatermarkGenerator; + } + + /** + * Creates a watermark strategy that generates no watermarks at all. + * This may be useful in scenarios that do pure processing-time based stream processing. + */ + static <T> WatermarkStrategy<T> noWatermarks() { + return (ctx) -> new NoWatermarksGenerator<>(); + } + + /** + * Creates Adds the given {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}) to + * this {@link WatermarkStrategy}. + * + * <p>You can use this when a {@link TimestampAssigner} needs additional context, for example + * access to the metrics system. + * + * <pre> + * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy + * .forMonotonousTimestamps() + * .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx)); + * }</pre> + */ + default WatermarkStrategy<T> withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) { + checkNotNull(timestampAssigner, "timestampAssigner"); + return new WithTimestampAssigner<>(this, timestampAssigner); + } + + /** + * Creates a new {@link WatermarkStrategy} with the {@link TimestampAssigner} overridden by the + * provided assigner. + * + * <p>You can use this in case you want to specify a {@link TimestampAssigner} via a lambda + * function. + * + * <pre> + * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy + * .forMonotonousTimestamps() + * .withTimestampAssigner((event, timestamp) -> event.getTimestamp()); + * }</pre> + */ + default WatermarkStrategy<T> withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) { + checkNotNull(timestampAssigner, "timestampAssigner"); + return new WithTimestampAssigner<>(this, TimestampAssignerSupplier.of(timestampAssigner)); + } + + /** + * Add an idle timeout to the watermark strategy. If no records flow in a partition of a stream + * for that amount of time, then that partition is considered "idle" and will not hold back the + * progress of watermarks in downstream operators. + * + * <p>Idleness can be important if some partitions have little data and might not have events + * during + * some periods. Without idleness, these streams can stall the overall event time progress of + * the application. + */ + default WatermarkStrategy<T> withIdleness(Duration idleTimeout) { + checkNotNull(idleTimeout, "idleTimeout"); + checkArgument(!(idleTimeout.isZero() || idleTimeout.isNegative()), + "idleTimeout must be greater than zero"); + return new WithIdlenessStrategy<>(this, idleTimeout); + } + + /** + * A {@link WatermarkStrategy} that adds idleness detection on top of the wrapped strategy. + */ + class WithIdlenessStrategy<T> implements WatermarkStrategy<T> { + + private static final long serialVersionUID = 1L; + + private final WatermarkStrategy<T> baseStrategy; + private final Duration idlenessTimeout; + + private WithIdlenessStrategy(WatermarkStrategy<T> baseStrategy, Duration idlenessTimeout) { + this.baseStrategy = baseStrategy; + this.idlenessTimeout = idlenessTimeout; + } + + @Override + public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) { + return baseStrategy.createTimestampAssigner(context); + } + + @Override + public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { + return new WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context), + idlenessTimeout); + } Review comment: Not sure. Is `equals` and `hashode` well-defined here? These are no classes describing data or state, but classes mainly representing logic. If testing is the sole purpose, then the right way to go, in my opinion, is writing a `Matcher` or an utility method that tests for the equality. Pushing a not-very-well-defined `equals()` implementation into production code just to simplify tests is not the right way. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org