This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 934007cf2b974d3c24b5db350498b430dec4d3bb
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Mon Aug 5 16:52:41 2024 +0200

    [FLINK-35886][task] Use RelativeClock in WatermarksWithIdleness
---
 .../eventtime/WatermarkGeneratorSupplier.java      | 12 ++++++++++-
 .../eventtime/WatermarkStrategyWithIdleness.java   |  4 +++-
 .../common/eventtime/WatermarksWithIdleness.java   | 25 +++++++++++++++-------
 .../common/eventtime/WatermarkStrategyTest.java    | 15 ++++++++++++-
 .../flink/table/toolbox/TestSourceFunction.java    | 17 ++++++++++++++-
 .../source/TimestampsAndWatermarksContext.java     |  7 ++++++
 .../operators/TimestampsAndWatermarksOperator.java | 17 ++++++++++++++-
 .../factories/TestValuesRuntimeFunctions.java      | 17 ++++++++++++++-
 .../codegen/WatermarkGeneratorCodeGenTest.scala    |  5 ++++-
 9 files changed, 104 insertions(+), 15 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier.java
index 0b0a1fd3ed8..4d149d39259 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.eventtime;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.clock.RelativeClock;
 
 import java.io.Serializable;
 
@@ -39,7 +40,7 @@ public interface WatermarkGeneratorSupplier<T> extends 
Serializable {
 
     /**
      * Additional information available to {@link 
#createWatermarkGenerator(Context)}. This can be
-     * access to {@link org.apache.flink.metrics.MetricGroup MetricGroups}, 
for example.
+     * access to {@link MetricGroup MetricGroups}, for example.
      */
     interface Context {
 
@@ -54,5 +55,14 @@ public interface WatermarkGeneratorSupplier<T> extends 
Serializable {
          * @see MetricGroup
          */
         MetricGroup getMetricGroup();
+
+        /**
+         * Returns a {@link RelativeClock} that hides periods when input was 
not active and {@link
+         * WatermarkGenerator} could not have been executed due to execution 
being blocked by the
+         * runtime. For example a backpressure or watermark alignment blocking 
the progress.
+         *
+         * @see RelativeClock
+         */
+        RelativeClock getInputActivityClock();
     }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java
index 660a7890277..7c0ae861952 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java
@@ -42,7 +42,9 @@ final class WatermarkStrategyWithIdleness<T> implements 
WatermarkStrategy<T> {
     public WatermarkGenerator<T> createWatermarkGenerator(
             WatermarkGeneratorSupplier.Context context) {
         return new WatermarksWithIdleness<>(
-                baseStrategy.createWatermarkGenerator(context), 
idlenessTimeout);
+                baseStrategy.createWatermarkGenerator(context),
+                idlenessTimeout,
+                context.getInputActivityClock());
     }
 
     @Override
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
index 3d61217ce04..a4d5639a5e1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.eventtime;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.RelativeClock;
 import org.apache.flink.util.clock.SystemClock;
 
 import java.time.Duration;
@@ -42,19 +43,27 @@ public class WatermarksWithIdleness<T> implements 
WatermarkGenerator<T> {
 
     private boolean isIdleNow = false;
 
+    /**
+     * This is not used anymore, but it's technically part of the {@link 
Public} API. Please use
+     * {@link #WatermarksWithIdleness(WatermarkGenerator, Duration, 
RelativeClock)} instead.
+     */
+    @Deprecated
+    public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration 
idleTimeout) {
+        this(watermarks, idleTimeout, SystemClock.getInstance());
+    }
+
     /**
      * Creates a new WatermarksWithIdleness generator to the given generator 
idleness detection with
      * the given timeout.
      *
      * @param watermarks The original watermark generator.
      * @param idleTimeout The timeout for the idleness detection.
+     * @param clock The clock that will be used to measure idleness period. It 
is expected that this
+     *     clock will hide periods when this {@link WatermarkGenerator} has 
been blocked from making
+     *     any progress despite availability of records on the input.
      */
-    public WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration 
idleTimeout) {
-        this(watermarks, idleTimeout, SystemClock.getInstance());
-    }
-
-    @VisibleForTesting
-    WatermarksWithIdleness(WatermarkGenerator<T> watermarks, Duration 
idleTimeout, Clock clock) {
+    public WatermarksWithIdleness(
+            WatermarkGenerator<T> watermarks, Duration idleTimeout, 
RelativeClock clock) {
         checkNotNull(idleTimeout, "idleTimeout");
         checkArgument(
                 !(idleTimeout.isZero() || idleTimeout.isNegative()),
@@ -88,7 +97,7 @@ public class WatermarksWithIdleness<T> implements 
WatermarkGenerator<T> {
     static final class IdlenessTimer {
 
         /** The clock used to measure elapsed time. */
-        private final Clock clock;
+        private final RelativeClock clock;
 
         /** Counter to detect change. No problem if it overflows. */
         private long counter;
@@ -105,7 +114,7 @@ public class WatermarksWithIdleness<T> implements 
WatermarkGenerator<T> {
         /** The duration before the output is marked as idle. */
         private final long maxIdleTimeNanos;
 
-        IdlenessTimer(Clock clock, Duration idleTimeout) {
+        IdlenessTimer(RelativeClock clock, Duration idleTimeout) {
             this.clock = clock;
 
             long idleNanos;
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java
index 2ce7b88d3f6..2ddafea1017 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java
@@ -20,7 +20,10 @@ package org.apache.flink.api.common.eventtime;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.clock.RelativeClock;
+import org.apache.flink.util.clock.SystemClock;
 
 import org.junit.jupiter.api.Test;
 
@@ -171,6 +174,16 @@ public class WatermarkStrategyTest {
     }
 
     static WatermarkGeneratorSupplier.Context generatorContext() {
-        return UnregisteredMetricsGroup::new;
+        return new WatermarkGeneratorSupplier.Context() {
+            @Override
+            public MetricGroup getMetricGroup() {
+                return new UnregisteredMetricsGroup();
+            }
+
+            @Override
+            public RelativeClock getInputActivityClock() {
+                return SystemClock.getInstance();
+            }
+        };
     }
 }
diff --git 
a/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java
 
b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java
index 347db1589c8..3fe4dce03aa 100644
--- 
a/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java
+++ 
b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java
@@ -20,13 +20,17 @@ package org.apache.flink.table.toolbox;
 
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.util.clock.RelativeClock;
+import org.apache.flink.util.clock.SystemClock;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -60,7 +64,18 @@ public class TestSourceFunction implements 
SourceFunction<RowData> {
     @Override
     public void run(SourceContext<RowData> ctx) {
         WatermarkGenerator<RowData> generator =
-                watermarkStrategy.createWatermarkGenerator(() -> null);
+                watermarkStrategy.createWatermarkGenerator(
+                        new WatermarkGeneratorSupplier.Context() {
+                            @Override
+                            public MetricGroup getMetricGroup() {
+                                return null;
+                            }
+
+                            @Override
+                            public RelativeClock getInputActivityClock() {
+                                return SystemClock.getInstance();
+                            }
+                        });
         WatermarkOutput output = new TestWatermarkOutput(ctx);
 
         int rowDataSize = DATA.get(0).size();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
index 4685691385d..8ec6cc8c01f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarksContext.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.clock.RelativeClock;
+import org.apache.flink.util.clock.SystemClock;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -43,4 +45,9 @@ public final class TimestampsAndWatermarksContext
     public MetricGroup getMetricGroup() {
         return metricGroup;
     }
+
+    @Override
+    public RelativeClock getInputActivityClock() {
+        return SystemClock.getInstance();
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index 570cb6b90b3..02f72f3ba32 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -21,14 +21,18 @@ import 
org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
 import org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.clock.RelativeClock;
+import org.apache.flink.util.clock.SystemClock;
 
 import static 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -80,7 +84,18 @@ public class TimestampsAndWatermarksOperator<T> extends 
AbstractStreamOperator<T
         timestampAssigner = 
watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
         watermarkGenerator =
                 emitProgressiveWatermarks
-                        ? 
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup)
+                        ? watermarkStrategy.createWatermarkGenerator(
+                                new WatermarkGeneratorSupplier.Context() {
+                                    @Override
+                                    public MetricGroup getMetricGroup() {
+                                        return this.getMetricGroup();
+                                    }
+
+                                    @Override
+                                    public RelativeClock 
getInputActivityClock() {
+                                        return SystemClock.getInstance();
+                                    }
+                                })
                         : new NoWatermarksGenerator<>();
 
         wmOutput = new WatermarkEmitter(output);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index 0f8d0dd70a9..cb12026b4a0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.factories;
 
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.io.RichOutputFormat;
@@ -31,6 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -60,6 +62,8 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.types.RowUtils;
+import org.apache.flink.util.clock.RelativeClock;
+import org.apache.flink.util.clock.SystemClock;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -266,7 +270,18 @@ final class TestValuesRuntimeFunctions {
             ByteArrayInputStream bais = new 
ByteArrayInputStream(elementsSerialized);
             final DataInputView input = new DataInputViewStreamWrapper(bais);
             WatermarkGenerator<RowData> generator =
-                    watermarkStrategy.createWatermarkGenerator(() -> null);
+                    watermarkStrategy.createWatermarkGenerator(
+                            new WatermarkGeneratorSupplier.Context() {
+                                @Override
+                                public MetricGroup getMetricGroup() {
+                                    return null;
+                                }
+
+                                @Override
+                                public RelativeClock getInputActivityClock() {
+                                    return SystemClock.getInstance();
+                                }
+                            });
             WatermarkOutput output = new TestValuesWatermarkOutput(ctx);
             final Object lock = ctx.getCheckpointLock();
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
index 98265f67299..75ae617a4f2 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
@@ -31,9 +31,10 @@ import 
org.apache.flink.table.runtime.generated.WatermarkGenerator
 import org.apache.flink.table.types.logical.{IntType, TimestampType}
 import org.apache.flink.table.utils.CatalogManagerMocks
 import 
org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension,
 Parameters}
+import org.apache.flink.util.clock.{RelativeClock, SystemClock}
 
-import org.junit.jupiter.api.{Test, TestTemplate}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.TestTemplate
 import org.junit.jupiter.api.extension.ExtendWith
 
 import java.lang.{Integer => JInt, Long => JLong}
@@ -167,6 +168,8 @@ class WatermarkGeneratorCodeGenTest(useDefinedConstructor: 
Boolean) {
       val newReferences = generated.getReferences :+
         new WatermarkGeneratorSupplier.Context {
           override def getMetricGroup: MetricGroup = null
+
+          override def getInputActivityClock: RelativeClock = 
SystemClock.getInstance()
         }
       generated.newInstance(Thread.currentThread().getContextClassLoader, 
newReferences)
     } else {

Reply via email to