This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 10d6d4f  [FLINK-24440][source] Announce and combine latest watermarks 
across SourceOperators
10d6d4f is described below

commit 10d6d4f2ef996519bff066ae28b0be702c2f0e16
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Tue Feb 8 09:54:53 2022 +0100

    [FLINK-24440][source] Announce and combine latest watermarks across 
SourceOperators
---
 .../source/coordinator/SourceCoordinator.java      | 149 ++++++++++++++++++++-
 .../coordinator/SourceCoordinatorContext.java      |  13 ++
 .../coordinator/SourceCoordinatorProvider.java     |   6 +-
 .../source/event/ReportedWatermarkEvent.java       |  64 +++++++++
 .../source/event/WatermarkAlignmentEvent.java      |  60 +++++++++
 .../SourceCoordinatorAlignmentTest.java            | 115 ++++++++++++++++
 .../coordinator/SourceCoordinatorTestBase.java     |  15 ++-
 7 files changed, 412 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index ac754e9..44c9ef7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.source.coordinator;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceEvent;
@@ -32,8 +33,10 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinatorStore;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
 import org.apache.flink.runtime.source.event.RequestSplitEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TemporaryClassLoaderContext;
@@ -49,10 +52,15 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion;
@@ -76,13 +84,19 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 @Internal
 public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
         implements OperatorCoordinator {
+    public static final WatermarkAlignmentParams WATERMARK_ALIGNMENT_DISABLED =
+            new WatermarkAlignmentParams(Long.MAX_VALUE, "", 0);
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SourceCoordinator.class);
 
+    private final WatermarkAggregator<Integer> combinedWatermark = new 
WatermarkAggregator<>();
+
+    private final WatermarkAlignmentParams watermarkAlignmentParams;
+
     /** The name of the operator this SourceCoordinator is associated with. */
     private final String operatorName;
     /** A single-thread executor to handle all the changes to the coordinator. 
*/
-    private final ExecutorService coordinatorExecutor;
+    private final ScheduledExecutorService coordinatorExecutor;
     /** The Source that is associated with this SourceCoordinator. */
     private final Source<?, SplitT, EnumChkT> source;
     /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
@@ -101,16 +115,70 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT>
 
     public SourceCoordinator(
             String operatorName,
-            ExecutorService coordinatorExecutor,
+            ScheduledExecutorService coordinatorExecutor,
             Source<?, SplitT, EnumChkT> source,
             SourceCoordinatorContext<SplitT> context,
             CoordinatorStore coordinatorStore) {
+        this(
+                operatorName,
+                coordinatorExecutor,
+                source,
+                context,
+                coordinatorStore,
+                WATERMARK_ALIGNMENT_DISABLED);
+    }
+
+    public SourceCoordinator(
+            String operatorName,
+            ScheduledExecutorService coordinatorExecutor,
+            Source<?, SplitT, EnumChkT> source,
+            SourceCoordinatorContext<SplitT> context,
+            CoordinatorStore coordinatorStore,
+            WatermarkAlignmentParams watermarkAlignmentParams) {
         this.operatorName = operatorName;
         this.coordinatorExecutor = coordinatorExecutor;
         this.source = source;
         this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
         this.context = context;
         this.coordinatorStore = coordinatorStore;
+        this.watermarkAlignmentParams = watermarkAlignmentParams;
+
+        if (watermarkAlignmentParams.isEnabled()) {
+            coordinatorStore.putIfAbsent(
+                    watermarkAlignmentParams.watermarkGroup, new 
WatermarkAggregator<>());
+            coordinatorExecutor.scheduleAtFixedRate(
+                    this::announceCombinedWatermark,
+                    watermarkAlignmentParams.updateInterval,
+                    watermarkAlignmentParams.updateInterval,
+                    TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @VisibleForTesting
+    void announceCombinedWatermark() {
+        checkState(watermarkAlignmentParams != WATERMARK_ALIGNMENT_DISABLED);
+
+        Watermark globalCombinedWatermark =
+                coordinatorStore.apply(
+                        watermarkAlignmentParams.watermarkGroup,
+                        (value) -> {
+                            WatermarkAggregator aggregator = 
(WatermarkAggregator) value;
+                            return new Watermark(
+                                    
aggregator.getAggregatedWatermark().getTimestamp());
+                        });
+
+        long maxAllowedWatermark =
+                globalCombinedWatermark.getTimestamp()
+                        + watermarkAlignmentParams.maxAllowedWatermarkDrift;
+        Set<Integer> subTaskIds = combinedWatermark.keySet();
+        LOG.info(
+                "Distributing maxAllowedWatermark={} to subTaskIds={}",
+                maxAllowedWatermark,
+                subTaskIds);
+        for (Integer subtaskId : subTaskIds) {
+            context.sendEventToSourceOperator(
+                    subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+        }
     }
 
     @Override
@@ -194,6 +262,10 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
                                 subtask,
                                 registrationEvent.location());
                         handleReaderRegistrationEvent(registrationEvent);
+                    } else if (event instanceof ReportedWatermarkEvent) {
+                        handleReportedWatermark(
+                                subtask,
+                                new Watermark(((ReportedWatermarkEvent) 
event).getWatermark()));
                     } else {
                         throw new FlinkException("Unrecognized Operator Event: 
" + event);
                     }
@@ -440,9 +512,80 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
         enumerator.addReader(event.subtaskId());
     }
 
+    private void handleReportedWatermark(int subtask, Watermark watermark) {
+        LOG.debug("New reported watermark={} from subTaskId={}", watermark, 
subtask);
+
+        checkState(watermarkAlignmentParams.isEnabled());
+
+        combinedWatermark
+                .aggregate(subtask, watermark)
+                .ifPresent(
+                        newCombinedWatermark ->
+                                coordinatorStore.computeIfPresent(
+                                        
watermarkAlignmentParams.watermarkGroup,
+                                        (key, oldValue) -> {
+                                            WatermarkAggregator<String> 
watermarkAggregator =
+                                                    
(WatermarkAggregator<String>) oldValue;
+                                            watermarkAggregator.aggregate(
+                                                    operatorName, 
newCombinedWatermark);
+                                            return watermarkAggregator;
+                                        }));
+    }
+
     private void ensureStarted() {
         if (!started) {
             throw new IllegalStateException("The coordinator has not started 
yet.");
         }
     }
+
+    private static class WatermarkAggregator<T> {
+        private final Map<T, Watermark> watermarks = new HashMap<>();
+        private Watermark aggregatedWatermark = new Watermark(Long.MIN_VALUE);
+
+        /**
+         * Update the {@link Watermark} for the given {@code key)}.
+         *
+         * @return the new updated combined {@link Watermark} if the value has 
changed. {@code
+         *     Optional.empty()} otherwise.
+         */
+        public Optional<Watermark> aggregate(T key, Watermark watermark) {
+            watermarks.put(key, watermark);
+            Watermark newMinimum =
+                    watermarks.values().stream()
+                            
.min(Comparator.comparingLong(Watermark::getTimestamp))
+                            .orElseThrow(IllegalStateException::new);
+            if (newMinimum.equals(aggregatedWatermark)) {
+                return Optional.empty();
+            } else {
+                aggregatedWatermark = newMinimum;
+                return Optional.of(aggregatedWatermark);
+            }
+        }
+
+        public Set<T> keySet() {
+            return watermarks.keySet();
+        }
+
+        public Watermark getAggregatedWatermark() {
+            return aggregatedWatermark;
+        }
+    }
+
+    /** Configuration parameters for watermark alignemnt. */
+    public static class WatermarkAlignmentParams {
+        private final long maxAllowedWatermarkDrift;
+        private final String watermarkGroup;
+        private final long updateInterval;
+
+        public WatermarkAlignmentParams(
+                long maxAllowedWatermarkDrift, String watermarkGroup, long 
updateInterval) {
+            this.maxAllowedWatermarkDrift = maxAllowedWatermarkDrift;
+            this.watermarkGroup = watermarkGroup;
+            this.updateInterval = updateInterval;
+        }
+
+        public boolean isEnabled() {
+            return maxAllowedWatermarkDrift < Long.MAX_VALUE;
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 2f9bf5d..83823f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -160,6 +160,19 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
                 String.format("Failed to send event %s to subtask %d", event, 
subtaskId));
     }
 
+    void sendEventToSourceOperator(int subtaskId, OperatorEvent event) {
+        checkSubtaskIndex(subtaskId);
+
+        callInCoordinatorThread(
+                () -> {
+                    final OperatorCoordinator.SubtaskGateway gateway =
+                            getGatewayAndCheckReady(subtaskId);
+                    gateway.sendEvent(event);
+                    return null;
+                },
+                String.format("Failed to send event %s to subtask %d", event, 
subtaskId));
+    }
+
     @Override
     public int currentParallelism() {
         return operatorCoordinatorContext.currentParallelism();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index c1936c5..a02cbf3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -30,8 +30,8 @@ import org.apache.flink.util.FatalExitExceptionHandler;
 import javax.annotation.Nullable;
 
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.function.BiConsumer;
 
@@ -71,8 +71,8 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit>
         CoordinatorExecutorThreadFactory coordinatorThreadFactory =
                 new CoordinatorExecutorThreadFactory(
                         coordinatorThreadName, 
context.getUserCodeClassloader());
-        ExecutorService coordinatorExecutor =
-                Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+        ScheduledExecutorService coordinatorExecutor =
+                Executors.newScheduledThreadPool(1, coordinatorThreadFactory);
 
         SimpleVersionedSerializer<SplitT> splitSerializer = 
source.getSplitSerializer();
         SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReportedWatermarkEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReportedWatermarkEvent.java
new file mode 100644
index 0000000..7ccf8c5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReportedWatermarkEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.event;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.Objects;
+
+/**
+ * Reports last emitted {@link Watermark} from a subtask to the {@link
+ * org.apache.flink.runtime.source.coordinator.SourceCoordinator}.
+ */
+public class ReportedWatermarkEvent implements OperatorEvent {
+
+    private static final long serialVersionUID = 1L;
+
+    private final long watermark;
+
+    public ReportedWatermarkEvent(long watermark) {
+        this.watermark = watermark;
+    }
+
+    public long getWatermark() {
+        return watermark;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ReportedWatermarkEvent that = (ReportedWatermarkEvent) o;
+        return watermark == that.watermark;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(watermark);
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "{" + "watermark=" + watermark + 
'}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java
new file mode 100644
index 0000000..0055c66
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.event;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.Objects;
+
+/** Signals source operators the maximum watermark that emitted records can 
have. */
+public class WatermarkAlignmentEvent implements OperatorEvent {
+
+    private static final long serialVersionUID = 1L;
+
+    private final long maxWatermark;
+
+    public WatermarkAlignmentEvent(long maxWatermark) {
+        this.maxWatermark = maxWatermark;
+    }
+
+    public long getMaxWatermark() {
+        return maxWatermark;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        WatermarkAlignmentEvent that = (WatermarkAlignmentEvent) o;
+        return maxWatermark == that.maxWatermark;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(maxWatermark);
+    }
+
+    @Override
+    public String toString() {
+        return "WatermarkAlignmentEvent{" + "maxWatermark=" + maxWatermark + 
'}';
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
new file mode 100644
index 0000000..bf98466
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.core.fs.AutoCloseableRegistry;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.WatermarkAlignmentParams;
+import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
+import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/** Unit tests for watermark alignment of the {@link SourceCoordinator}. */
+@SuppressWarnings("serial")
+public class SourceCoordinatorAlignmentTest extends SourceCoordinatorTestBase {
+
+    @Test
+    public void testWatermarkAlignment() throws Exception {
+        try (AutoCloseableRegistry closeableRegistry = new 
AutoCloseableRegistry()) {
+            SourceCoordinator<?, ?> sourceCoordinator1 =
+                    getAndStartNewSourceCoordinator(
+                            new WatermarkAlignmentParams(1000L, "group1", 
Long.MAX_VALUE),
+                            closeableRegistry);
+
+            int subtask0 = 0;
+            int subtask1 = 1;
+            reportWatermarkEvent(sourceCoordinator1, subtask0, 42);
+            assertLatestWatermarkAlignmentEvent(subtask0, 1042);
+
+            reportWatermarkEvent(sourceCoordinator1, subtask1, 44);
+            assertLatestWatermarkAlignmentEvent(subtask0, 1042);
+            assertLatestWatermarkAlignmentEvent(subtask1, 1042);
+
+            reportWatermarkEvent(sourceCoordinator1, subtask0, 5000);
+            assertLatestWatermarkAlignmentEvent(subtask0, 1044);
+            assertLatestWatermarkAlignmentEvent(subtask1, 1044);
+        }
+    }
+
+    @Test
+    public void testWatermarkAlignmentWithTwoGroups() throws Exception {
+        try (AutoCloseableRegistry closeableRegistry = new 
AutoCloseableRegistry()) {
+            long maxDrift = 1000L;
+            SourceCoordinator<?, ?> sourceCoordinator1 =
+                    getAndStartNewSourceCoordinator(
+                            new WatermarkAlignmentParams(maxDrift, "group1", 
Long.MAX_VALUE),
+                            closeableRegistry);
+
+            SourceCoordinator<?, ?> sourceCoordinator2 =
+                    getAndStartNewSourceCoordinator(
+                            new WatermarkAlignmentParams(maxDrift, "group2", 
Long.MAX_VALUE),
+                            closeableRegistry);
+
+            int subtask0 = 0;
+            int subtask1 = 1;
+            reportWatermarkEvent(sourceCoordinator1, subtask0, 42);
+            assertLatestWatermarkAlignmentEvent(subtask0, 1042);
+
+            reportWatermarkEvent(sourceCoordinator2, subtask1, 44);
+            assertLatestWatermarkAlignmentEvent(subtask0, 1042);
+            assertLatestWatermarkAlignmentEvent(subtask1, 1044);
+
+            reportWatermarkEvent(sourceCoordinator1, subtask0, 5000);
+            assertLatestWatermarkAlignmentEvent(subtask0, 6000);
+            assertLatestWatermarkAlignmentEvent(subtask1, 1044);
+        }
+    }
+
+    protected SourceCoordinator<?, ?> getAndStartNewSourceCoordinator(
+            WatermarkAlignmentParams watermarkAlignmentParams,
+            AutoCloseableRegistry closeableRegistry)
+            throws Exception {
+        SourceCoordinator<?, ?> sourceCoordinator =
+                getNewSourceCoordinator(watermarkAlignmentParams);
+        closeableRegistry.registerCloseable(sourceCoordinator);
+        sourceCoordinator.start();
+        setAllReaderTasksReady(sourceCoordinator);
+
+        return sourceCoordinator;
+    }
+
+    private void reportWatermarkEvent(
+            SourceCoordinator<?, ?> sourceCoordinator1, int subtask, long 
watermark) {
+        sourceCoordinator1.handleEventFromOperator(subtask, new 
ReportedWatermarkEvent(watermark));
+        waitForCoordinatorToProcessActions();
+        sourceCoordinator1.announceCombinedWatermark();
+    }
+
+    private void assertLatestWatermarkAlignmentEvent(int subtask, long 
expectedWatermark) {
+        List<OperatorEvent> events = 
receivingTasks.getSentEventsForSubtask(subtask);
+        assertFalse(events.isEmpty());
+        assertEquals(new WatermarkAlignmentEvent(expectedWatermark), 
events.get(events.size() - 1));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index eda26fe..bd81efa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
 import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.WatermarkAlignmentParams;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -38,8 +39,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -58,7 +59,7 @@ public abstract class SourceCoordinatorTestBase {
 
     // ---- Mocks for the Source Coordinator Context ----
     protected SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory;
-    protected ExecutorService coordinatorExecutor;
+    protected ScheduledExecutorService coordinatorExecutor;
     protected SplitAssignmentTracker<MockSourceSplit> 
splitSplitAssignmentTracker;
     protected SourceCoordinatorContext<MockSourceSplit> context;
 
@@ -79,7 +80,7 @@ public abstract class SourceCoordinatorTestBase {
                 new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
                         coordinatorThreadName, getClass().getClassLoader());
 
-        coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+        coordinatorExecutor = Executors.newScheduledThreadPool(1, 
coordinatorThreadFactory);
         sourceCoordinator = getNewSourceCoordinator();
         context = sourceCoordinator.getContext();
     }
@@ -148,6 +149,11 @@ public abstract class SourceCoordinatorTestBase {
     // ------------------------------------------------------------------------
 
     protected SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> 
getNewSourceCoordinator() {
+        return 
getNewSourceCoordinator(SourceCoordinator.WATERMARK_ALIGNMENT_DISABLED);
+    }
+
+    protected SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> 
getNewSourceCoordinator(
+            WatermarkAlignmentParams watermarkAlignmentParams) {
         final Source<Integer, MockSourceSplit, Set<MockSourceSplit>> 
mockSource =
                 TestingSplitEnumerator.factorySource(
                         new MockSourceSplitSerializer(),
@@ -158,7 +164,8 @@ public abstract class SourceCoordinatorTestBase {
                 coordinatorExecutor,
                 mockSource,
                 getNewSourceCoordinatorContext(),
-                new CoordinatorStoreImpl());
+                new CoordinatorStoreImpl(),
+                watermarkAlignmentParams);
     }
 
     protected SourceCoordinatorContext<MockSourceSplit> 
getNewSourceCoordinatorContext() {

Reply via email to