pnowojski commented on a change in pull request #18726:
URL: https://github.com/apache/flink/pull/18726#discussion_r804583848



##########
File path: docs/content/docs/ops/metrics.md
##########
@@ -1482,6 +1482,17 @@ Note that the metrics are only available via reporters.
       </td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <td>watermarkDrift</td>

Review comment:
       I guess you need to bump `rowspan="3"` above.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
##########
@@ -131,6 +133,15 @@ public void watermarkEmitted(long watermark) {
         }
     }
 
+    public void updateMaxDesiredWatermark(long currentMaxDesiredWatermark) {
+        this.currentMaxDesiredWatermark = currentMaxDesiredWatermark;
+
+        if (firstDesiredWatermark) {
+            parentMetricGroup.gauge(MetricNames.WATERMARK_DRIFT, 
this::getAlignedWatermarkDrift);
+            firstDesiredWatermark = false;
+        }

Review comment:
       nit: I wonder if it's better for the tooling to have the metric defined 
always, instead of being registered lazily?

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithWatermarkAlignment.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/** A helper class to pass a watermark group and max allowed watermark drift 
to the runtime. */
+@Internal
+public final class WatermarksWithWatermarkAlignment<T> implements 
WatermarkStrategy<T> {
+
+    static final Duration DEFAULT_UPDATE_INTERVAL = Duration.ofMillis(1000);
+
+    private final WatermarkStrategy<T> strategy;
+
+    private final String watermarkGroup;
+
+    private final Duration maxAllowedWatermarkDrift;
+
+    private final Duration updateInterval;
+
+    public WatermarksWithWatermarkAlignment(
+            String watermarkGroup,
+            Duration maxAllowedWatermarkDrift,
+            Duration updateInterval,
+            WatermarkStrategy<T> strategy) {

Review comment:
       nit: for the sake of consistency move `strategy` as first parameter?

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithWatermarkAlignment.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.Duration;
+
+/** A helper class to pass a watermark group and max allowed watermark drift 
to the runtime. */
+@Internal
+public final class WatermarksWithWatermarkAlignment<T> implements 
WatermarkStrategy<T> {
+
+    static final Duration DEFAULT_UPDATE_INTERVAL = Duration.ofMillis(1000);
+
+    private final WatermarkStrategy<T> strategy;
+
+    private final String watermarkGroup;
+
+    private final Duration maxAllowedWatermarkDrift;
+
+    private final Duration updateInterval;
+
+    public WatermarksWithWatermarkAlignment(
+            String watermarkGroup,
+            Duration maxAllowedWatermarkDrift,
+            Duration updateInterval,
+            WatermarkStrategy<T> strategy) {
+        this.strategy = strategy;
+        this.watermarkGroup = watermarkGroup;
+        this.maxAllowedWatermarkDrift = maxAllowedWatermarkDrift;
+        this.updateInterval = updateInterval;
+    }
+
+    public String getWatermarkGroup() {
+        return watermarkGroup;
+    }
+
+    public Duration getMaxAllowedWatermarkDrift() {
+        return maxAllowedWatermarkDrift;
+    }
+
+    public Duration getUpdateInterval() {
+        return updateInterval;
+    }
+
+    @Override
+    public WatermarkGenerator<T> createWatermarkGenerator(
+            WatermarkGeneratorSupplier.Context context) {
+        return strategy.createWatermarkGenerator(context);
+    }
+
+    @Override
+    public TimestampAssigner<T> 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
+        return strategy.createTimestampAssigner(context);
+    }
+
+    @Override
+    public WatermarkStrategy<T> withTimestampAssigner(
+            TimestampAssignerSupplier<T> timestampAssigner) {
+        return new WatermarksWithWatermarkAlignment<T>(
+                watermarkGroup,
+                maxAllowedWatermarkDrift,
+                updateInterval,
+                strategy.withTimestampAssigner(timestampAssigner));
+    }

Review comment:
       I'm not sure if such approach would scale well. I guess it would be 
working now, but:
   1. if someone adds new method to the `WatermarkStrategy` interface with a 
default implementation, this will brake
   2. if someone adds another strategy, that is implemented the same way as 
`WatermarksWithWatermarkAlignment` they will also not work together.
   
   Instead, can not we keep the same pattern as before, so this class would 
only implement `createTimestampAssigner`, `createWatermarkGenerator` and a new 
method `getAlignmentParams()`? While replacing `watermarkStrategy instanceof 
WatermarksWithWatermarkAlignment` check with a 
`watermarkStrategy.getAlignmentParams()` call:
   ```
   final class WatermarkStrategyWithAlignment<T> implements 
WatermarkStrategy<T> {
   
       private static final long serialVersionUID = 1L;
   
       private final WatermarkStrategy<T> baseStrategy;
       private final String watermarkGroup;
       private final Duration maxAllowedWatermarkDrift;
       private final Duration updateInterval;
   
       public WatermarkStrategyWithAlignment(
               String watermarkGroup,
               Duration maxAllowedWatermarkDrift,
               Duration updateInterval,
               WatermarkStrategy<T> strategy) {
           this.baseStrategy = strategy;
           this.watermarkGroup = watermarkGroup;
           this.maxAllowedWatermarkDrift = maxAllowedWatermarkDrift;
           this.updateInterval = updateInterval;
       }
   
       @Override
       public WatermarkAlignmentParams getAlignmentParams() {
           // default, return WatermarkAlignmentParams#DISABLED;
           return new WatermarkAlignmentParams(watermarkGroup, 
maxAllowedWatermarkDrift, updateInterval);
       }
   
       @Override
       public TimestampAssigner<T> 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
           return baseStrategy.createTimestampAssigner(context);
       }
   
       @Override
       public WatermarkGenerator<T> createWatermarkGenerator(
               WatermarkGeneratorSupplier.Context context) {
           ...
       }
   }
   ```
   ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceReaderMetricGroup.java
##########
@@ -157,4 +168,8 @@ long getEmitTimeLag() {
     long getWatermarkLag() {
         return getLastEmitTime() - lastWatermark;
     }
+
+    long getAlignedWatermarkDrift() {
+        return Math.max(0, lastWatermark - currentMaxDesiredWatermark);

Review comment:
       don't the negative values have also meaningful sense?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -453,6 +438,18 @@ private DataInputStatus emitNextNotReading(DataOutput<OUT> 
output) throws Except
         }
     }
 
+    private void initializeWatermarkAlignment(DataOutput<OUT> output) {
+        if (watermarkStrategy instanceof WatermarksWithWatermarkAlignment) {
+            watermarkTrackingOutput = new WatermarkTrackingOutput<>(output);
+            final long updateInterval =
+                    ((WatermarksWithWatermarkAlignment<OUT>) watermarkStrategy)
+                            .getUpdateInterval()
+                            .toMillis();
+            processingTimeService.scheduleWithFixedDelay(
+                    this::emitLatestWatermark, updateInterval, updateInterval);
+        }
+    }

Review comment:
       ? I believe this is not being used?

##########
File path: docs/content/docs/ops/metrics.md
##########
@@ -1482,6 +1482,17 @@ Note that the metrics are only available via reporters.
       </td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <td>watermarkDrift</td>
+      <td>
+        The current drift from the minimal watermark emitted by all 
sources/tasks/splits that belong
+        to the same watermark group.
+
+        Note: Available only when watermark alignment is enabled and the first 
common watermark is
+        announced. You can configure the update interval in the 
WatermarkStrategy.
+      </td>
+      <td>Gauge</td>
+    </tr>

Review comment:
       Please update also the Chinese version

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -167,6 +164,9 @@
     private InternalSourceReaderMetricGroup sourceMetricGroup;
 
     @Nullable private WatermarkTrackingOutput<OUT> watermarkTrackingOutput;
+    private final boolean alignmentEnabled;
+    private final long alignmentUpdateInterval;

Review comment:
       nit: for the sake of consistence with `SourceCoordinator` and 
completeness, I would keep the `WatermarkAlignementParams` here. You could 
re-use strategy -> params conversion code in both places.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
##########
@@ -95,4 +95,7 @@ public static String currentInputWatermarkName(int index) {
     public static final String PENDING_RECORDS = "pendingRecords";
     public static final String PENDING_BYTES = "pendingBytes";
     public static final String SOURCE_IDLE_TIME = "sourceIdleTime";
+
+    // FLIP-182 (watermark alignment)
+    public static final String WATERMARK_DRIFT = "watermarkDrift";

Review comment:
       `watermarkAlignmentDrift`? 

##########
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/AlignedWatermarksITCase.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.test.util.MiniClusterWithClientExtension;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.event.Level;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * An IT test to verify watermark alignment works. The idea is that we check 
the {@link
+ * MetricNames#WATERMARK_DRIFT} metric does not grow beyond the maximal 
configured drift. It can
+ * initially increase beyond that value, because of the update interval, but 
once established it
+ * should never increase any further, but gradually decrease to the configured 
threshold, as the
+ * slower source catches up.
+ */
+public class AlignedWatermarksITCase {
+    public static final String SLOW_SOURCE_NAME = "SlowNumberSequenceSource";
+    public static final String FAST_SOURCE_NAME = "FastNumberSequenceSource";
+    public static final int MAX_DRIFT = 100;
+
+    @RegisterExtension
+    LoggerAuditingExtension loggerAuditingExtension =
+            new LoggerAuditingExtension(AlignedWatermarksITCase.class, 
Level.INFO);
+
+    private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+
+    public static final MiniClusterWithClientExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterWithClientExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+                            .build());
+
+    @RegisterExtension
+    static AllCallbackWrapper<MiniClusterWithClientExtension> allWrapper =
+            new AllCallbackWrapper<>(MINI_CLUSTER_RESOURCE);
+
+    @Test
+    public void testAlignment() throws Exception {
+        final JobGraph jobGraph = getJobGraph();
+        final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
+        final CompletableFuture<JobSubmissionResult> submission = 
miniCluster.submitJob(jobGraph);
+        final JobID jobID = submission.get().getJobID();
+        CommonTestUtils.waitForAllTaskRunning(miniCluster, jobID, false);
+
+        long oldDrift = Long.MAX_VALUE;
+        do {
+            final Optional<Metric> drift =
+                    reporter.findMetric(
+                            jobID, FAST_SOURCE_NAME + ".*" + 
MetricNames.WATERMARK_DRIFT);
+            Thread.sleep(200);
+
+            final Optional<Long> newDriftOptional = drift.map(m -> 
((Gauge<Long>) m).getValue());
+            if (newDriftOptional.isPresent()) {
+                final Long newDrift = newDriftOptional.get();
+                assertThat(newDrift).isLessThanOrEqualTo(oldDrift);
+                oldDrift = newDrift;
+            }
+        } while (oldDrift >= MAX_DRIFT);
+    }
+
+    private JobGraph getJobGraph() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setAutoWatermarkInterval(0L);
+        env.setParallelism(1);
+
+        DataStream<Long> slowSource =
+                env.fromSource(
+                                new NumberSequenceSource(0, Long.MAX_VALUE),
+                                WatermarkStrategy.forGenerator(ctx -> new 
PunctuatedGenerator())
+                                        .withWatermarkAlignment(
+                                                "group-1", 
Duration.ofMillis(MAX_DRIFT))

Review comment:
       as we discussed offline. I would suggest to change:
   
   1. `Thread.sleep(5)` -> `Thread.sleep(10)`
   2. `Thread.sleep(2)` -> `Thread.sleep(1)`
   3. update interval from 1s to 100ms
   
   On the paper test should be actually more thorough and faster to execute.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to