StephanEwen commented on a change in pull request #12412:
URL: https://github.com/apache/flink/pull/12412#discussion_r433877916



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
##########
@@ -49,4 +53,158 @@
         */
        @Override
        WatermarkGenerator<T> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+
+       /**
+        * Creates a watermark strategy for situations with monotonously 
ascending timestamps.
+        *
+        * <p>The watermarks are generated periodically and tightly follow the 
latest
+        * timestamp in the data. The delay introduced by this strategy is 
mainly the periodic interval
+        * in which the watermarks are generated.
+        *
+        * @see AscendingTimestampsWatermarks
+        */
+       static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
+               return (ctx) -> new AscendingTimestampsWatermarks<>();
+       }
+
+       /**
+        * Creates a watermark strategy for situations where records are out of 
order, but you
+        * can place an upper bound on how far the events are out of order. An 
out-of-order bound B
+        * means that once the an event with timestamp T was encountered, no 
events older than {@code T
+        * - B} will follow any more.
+        *
+        * <p>The watermarks are generated periodically. The delay introduced 
by this watermark
+        * strategy
+        * is the periodic interval length, plus the out of orderness bound.
+        *
+        * @see BoundedOutOfOrdernessWatermarks
+        */
+       static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration 
maxOutOfOrderness) {
+               return (ctx) -> new 
BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
+       }
+
+       /**
+        * Creates a watermark strategy based on an existing {@link
+        * WatermarkGeneratorSupplier}.
+        */
+       static <T> WatermarkStrategy<T> 
forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
+               return generatorSupplier::createWatermarkGenerator;
+       }
+
+       /**
+        * Creates a watermark strategy that generates no watermarks at all.
+        * This may be useful in scenarios that do pure processing-time based 
stream processing.
+        */
+       static <T> WatermarkStrategy<T> noWatermarks() {
+               return (ctx) -> new NoWatermarksGenerator<>();
+       }
+
+       /**
+        * Creates Adds the given {@link TimestampAssigner} (via a {@link 
TimestampAssignerSupplier}) to
+        * this {@link WatermarkStrategy}.
+        *
+        * <p>You can use this when a {@link TimestampAssigner} needs 
additional context, for example
+        * access to the metrics system.
+        *
+        * <pre>
+        * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
+        *   .forMonotonousTimestamps()
+        *   .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
+        * }</pre>
+        */
+       default WatermarkStrategy<T> 
withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) {
+               checkNotNull(timestampAssigner, "timestampAssigner");
+               return new WithTimestampAssigner<>(this, timestampAssigner);
+       }
+
+       /**
+        * Creates a new {@link WatermarkStrategy} with the {@link 
TimestampAssigner} overridden by the
+        * provided assigner.
+        *
+        * <p>You can use this in case you want to specify a {@link 
TimestampAssigner} via a lambda
+        * function.
+        *
+        * <pre>
+        * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy
+        *   .forMonotonousTimestamps()
+        *   .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
+        * }</pre>
+        */
+       default WatermarkStrategy<T> 
withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
+               checkNotNull(timestampAssigner, "timestampAssigner");
+               return new WithTimestampAssigner<>(this, 
TimestampAssignerSupplier.of(timestampAssigner));
+       }
+
+       /**
+        * Add an idle timeout to the watermark strategy. If no records flow in 
a partition of a stream
+        * for that amount of time, then that partition is considered "idle" 
and will not hold back the
+        * progress of watermarks in downstream operators.
+        *
+        * <p>Idleness can be important if some partitions have little data and 
might not have events
+        * during
+        * some periods. Without idleness, these streams can stall the overall 
event time progress of
+        * the application.
+        */
+       default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
+               checkNotNull(idleTimeout, "idleTimeout");
+               checkArgument(!(idleTimeout.isZero() || 
idleTimeout.isNegative()),
+                               "idleTimeout must be greater than zero");
+               return new WithIdlenessStrategy<>(this, idleTimeout);
+       }
+
+       /**
+        * A {@link WatermarkStrategy} that adds idleness detection on top of 
the wrapped strategy.
+        */
+       class WithIdlenessStrategy<T> implements WatermarkStrategy<T> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final WatermarkStrategy<T> baseStrategy;
+               private final Duration idlenessTimeout;
+
+               private WithIdlenessStrategy(WatermarkStrategy<T> baseStrategy, 
Duration idlenessTimeout) {
+                       this.baseStrategy = baseStrategy;
+                       this.idlenessTimeout = idlenessTimeout;
+               }
+
+               @Override
+               public TimestampAssigner<T> 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
+                       return baseStrategy.createTimestampAssigner(context);
+               }
+
+               @Override
+               public WatermarkGenerator<T> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
+                       return new 
WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context),
+                                       idlenessTimeout);
+               }

Review comment:
       Not sure. Is `equals` and `hashode` well-defined here? These are no 
classes describing data or state, but classes mainly representing logic.
   
   If testing is the sole purpose, then the right way to go, in my opinion, is 
writing a `Matcher` or an utility method that tests for the equality. Pushing a 
not-very-well-defined `equals()` implementation into production code just to 
simplify tests is not the right way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to