This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 10d6d4f [FLINK-24440][source] Announce and combine latest watermarks across SourceOperators 10d6d4f is described below commit 10d6d4f2ef996519bff066ae28b0be702c2f0e16 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Tue Feb 8 09:54:53 2022 +0100 [FLINK-24440][source] Announce and combine latest watermarks across SourceOperators --- .../source/coordinator/SourceCoordinator.java | 149 ++++++++++++++++++++- .../coordinator/SourceCoordinatorContext.java | 13 ++ .../coordinator/SourceCoordinatorProvider.java | 6 +- .../source/event/ReportedWatermarkEvent.java | 64 +++++++++ .../source/event/WatermarkAlignmentEvent.java | 60 +++++++++ .../SourceCoordinatorAlignmentTest.java | 115 ++++++++++++++++ .../coordinator/SourceCoordinatorTestBase.java | 15 ++- 7 files changed, 412 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index ac754e9..44c9ef7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.source.coordinator; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceEvent; @@ -32,8 +33,10 @@ import org.apache.flink.runtime.operators.coordination.CoordinatorStore; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; +import org.apache.flink.runtime.source.event.ReportedWatermarkEvent; import org.apache.flink.runtime.source.event.RequestSplitEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TemporaryClassLoaderContext; @@ -49,10 +52,15 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion; @@ -76,13 +84,19 @@ import static org.apache.flink.util.Preconditions.checkState; @Internal public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator { + public static final WatermarkAlignmentParams WATERMARK_ALIGNMENT_DISABLED = + new WatermarkAlignmentParams(Long.MAX_VALUE, "", 0); private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinator.class); + private final WatermarkAggregator<Integer> combinedWatermark = new WatermarkAggregator<>(); + + private final WatermarkAlignmentParams watermarkAlignmentParams; + /** The name of the operator this SourceCoordinator is associated with. */ private final String operatorName; /** A single-thread executor to handle all the changes to the coordinator. */ - private final ExecutorService coordinatorExecutor; + private final ScheduledExecutorService coordinatorExecutor; /** The Source that is associated with this SourceCoordinator. */ private final Source<?, SplitT, EnumChkT> source; /** The serializer that handles the serde of the SplitEnumerator checkpoints. */ @@ -101,16 +115,70 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> public SourceCoordinator( String operatorName, - ExecutorService coordinatorExecutor, + ScheduledExecutorService coordinatorExecutor, Source<?, SplitT, EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore) { + this( + operatorName, + coordinatorExecutor, + source, + context, + coordinatorStore, + WATERMARK_ALIGNMENT_DISABLED); + } + + public SourceCoordinator( + String operatorName, + ScheduledExecutorService coordinatorExecutor, + Source<?, SplitT, EnumChkT> source, + SourceCoordinatorContext<SplitT> context, + CoordinatorStore coordinatorStore, + WatermarkAlignmentParams watermarkAlignmentParams) { this.operatorName = operatorName; this.coordinatorExecutor = coordinatorExecutor; this.source = source; this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer(); this.context = context; this.coordinatorStore = coordinatorStore; + this.watermarkAlignmentParams = watermarkAlignmentParams; + + if (watermarkAlignmentParams.isEnabled()) { + coordinatorStore.putIfAbsent( + watermarkAlignmentParams.watermarkGroup, new WatermarkAggregator<>()); + coordinatorExecutor.scheduleAtFixedRate( + this::announceCombinedWatermark, + watermarkAlignmentParams.updateInterval, + watermarkAlignmentParams.updateInterval, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + void announceCombinedWatermark() { + checkState(watermarkAlignmentParams != WATERMARK_ALIGNMENT_DISABLED); + + Watermark globalCombinedWatermark = + coordinatorStore.apply( + watermarkAlignmentParams.watermarkGroup, + (value) -> { + WatermarkAggregator aggregator = (WatermarkAggregator) value; + return new Watermark( + aggregator.getAggregatedWatermark().getTimestamp()); + }); + + long maxAllowedWatermark = + globalCombinedWatermark.getTimestamp() + + watermarkAlignmentParams.maxAllowedWatermarkDrift; + Set<Integer> subTaskIds = combinedWatermark.keySet(); + LOG.info( + "Distributing maxAllowedWatermark={} to subTaskIds={}", + maxAllowedWatermark, + subTaskIds); + for (Integer subtaskId : subTaskIds) { + context.sendEventToSourceOperator( + subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); + } } @Override @@ -194,6 +262,10 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> subtask, registrationEvent.location()); handleReaderRegistrationEvent(registrationEvent); + } else if (event instanceof ReportedWatermarkEvent) { + handleReportedWatermark( + subtask, + new Watermark(((ReportedWatermarkEvent) event).getWatermark())); } else { throw new FlinkException("Unrecognized Operator Event: " + event); } @@ -440,9 +512,80 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> enumerator.addReader(event.subtaskId()); } + private void handleReportedWatermark(int subtask, Watermark watermark) { + LOG.debug("New reported watermark={} from subTaskId={}", watermark, subtask); + + checkState(watermarkAlignmentParams.isEnabled()); + + combinedWatermark + .aggregate(subtask, watermark) + .ifPresent( + newCombinedWatermark -> + coordinatorStore.computeIfPresent( + watermarkAlignmentParams.watermarkGroup, + (key, oldValue) -> { + WatermarkAggregator<String> watermarkAggregator = + (WatermarkAggregator<String>) oldValue; + watermarkAggregator.aggregate( + operatorName, newCombinedWatermark); + return watermarkAggregator; + })); + } + private void ensureStarted() { if (!started) { throw new IllegalStateException("The coordinator has not started yet."); } } + + private static class WatermarkAggregator<T> { + private final Map<T, Watermark> watermarks = new HashMap<>(); + private Watermark aggregatedWatermark = new Watermark(Long.MIN_VALUE); + + /** + * Update the {@link Watermark} for the given {@code key)}. + * + * @return the new updated combined {@link Watermark} if the value has changed. {@code + * Optional.empty()} otherwise. + */ + public Optional<Watermark> aggregate(T key, Watermark watermark) { + watermarks.put(key, watermark); + Watermark newMinimum = + watermarks.values().stream() + .min(Comparator.comparingLong(Watermark::getTimestamp)) + .orElseThrow(IllegalStateException::new); + if (newMinimum.equals(aggregatedWatermark)) { + return Optional.empty(); + } else { + aggregatedWatermark = newMinimum; + return Optional.of(aggregatedWatermark); + } + } + + public Set<T> keySet() { + return watermarks.keySet(); + } + + public Watermark getAggregatedWatermark() { + return aggregatedWatermark; + } + } + + /** Configuration parameters for watermark alignemnt. */ + public static class WatermarkAlignmentParams { + private final long maxAllowedWatermarkDrift; + private final String watermarkGroup; + private final long updateInterval; + + public WatermarkAlignmentParams( + long maxAllowedWatermarkDrift, String watermarkGroup, long updateInterval) { + this.maxAllowedWatermarkDrift = maxAllowedWatermarkDrift; + this.watermarkGroup = watermarkGroup; + this.updateInterval = updateInterval; + } + + public boolean isEnabled() { + return maxAllowedWatermarkDrift < Long.MAX_VALUE; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 2f9bf5d..83823f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -160,6 +160,19 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> String.format("Failed to send event %s to subtask %d", event, subtaskId)); } + void sendEventToSourceOperator(int subtaskId, OperatorEvent event) { + checkSubtaskIndex(subtaskId); + + callInCoordinatorThread( + () -> { + final OperatorCoordinator.SubtaskGateway gateway = + getGatewayAndCheckReady(subtaskId); + gateway.sendEvent(event); + return null; + }, + String.format("Failed to send event %s to subtask %d", event, subtaskId)); + } + @Override public int currentParallelism() { return operatorCoordinatorContext.currentParallelism(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java index c1936c5..a02cbf3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java @@ -30,8 +30,8 @@ import org.apache.flink.util.FatalExitExceptionHandler; import javax.annotation.Nullable; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.function.BiConsumer; @@ -71,8 +71,8 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> CoordinatorExecutorThreadFactory coordinatorThreadFactory = new CoordinatorExecutorThreadFactory( coordinatorThreadName, context.getUserCodeClassloader()); - ExecutorService coordinatorExecutor = - Executors.newSingleThreadExecutor(coordinatorThreadFactory); + ScheduledExecutorService coordinatorExecutor = + Executors.newScheduledThreadPool(1, coordinatorThreadFactory); SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer(); SourceCoordinatorContext<SplitT> sourceCoordinatorContext = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReportedWatermarkEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReportedWatermarkEvent.java new file mode 100644 index 0000000..7ccf8c5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReportedWatermarkEvent.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.source.event; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +import java.util.Objects; + +/** + * Reports last emitted {@link Watermark} from a subtask to the {@link + * org.apache.flink.runtime.source.coordinator.SourceCoordinator}. + */ +public class ReportedWatermarkEvent implements OperatorEvent { + + private static final long serialVersionUID = 1L; + + private final long watermark; + + public ReportedWatermarkEvent(long watermark) { + this.watermark = watermark; + } + + public long getWatermark() { + return watermark; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReportedWatermarkEvent that = (ReportedWatermarkEvent) o; + return watermark == that.watermark; + } + + @Override + public int hashCode() { + return Objects.hash(watermark); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + "watermark=" + watermark + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java new file mode 100644 index 0000000..0055c66 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.source.event; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +import java.util.Objects; + +/** Signals source operators the maximum watermark that emitted records can have. */ +public class WatermarkAlignmentEvent implements OperatorEvent { + + private static final long serialVersionUID = 1L; + + private final long maxWatermark; + + public WatermarkAlignmentEvent(long maxWatermark) { + this.maxWatermark = maxWatermark; + } + + public long getMaxWatermark() { + return maxWatermark; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WatermarkAlignmentEvent that = (WatermarkAlignmentEvent) o; + return maxWatermark == that.maxWatermark; + } + + @Override + public int hashCode() { + return Objects.hash(maxWatermark); + } + + @Override + public String toString() { + return "WatermarkAlignmentEvent{" + "maxWatermark=" + maxWatermark + '}'; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java new file mode 100644 index 0000000..bf98466 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.source.coordinator; + +import org.apache.flink.core.fs.AutoCloseableRegistry; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.source.coordinator.SourceCoordinator.WatermarkAlignmentParams; +import org.apache.flink.runtime.source.event.ReportedWatermarkEvent; +import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent; + +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +/** Unit tests for watermark alignment of the {@link SourceCoordinator}. */ +@SuppressWarnings("serial") +public class SourceCoordinatorAlignmentTest extends SourceCoordinatorTestBase { + + @Test + public void testWatermarkAlignment() throws Exception { + try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry()) { + SourceCoordinator<?, ?> sourceCoordinator1 = + getAndStartNewSourceCoordinator( + new WatermarkAlignmentParams(1000L, "group1", Long.MAX_VALUE), + closeableRegistry); + + int subtask0 = 0; + int subtask1 = 1; + reportWatermarkEvent(sourceCoordinator1, subtask0, 42); + assertLatestWatermarkAlignmentEvent(subtask0, 1042); + + reportWatermarkEvent(sourceCoordinator1, subtask1, 44); + assertLatestWatermarkAlignmentEvent(subtask0, 1042); + assertLatestWatermarkAlignmentEvent(subtask1, 1042); + + reportWatermarkEvent(sourceCoordinator1, subtask0, 5000); + assertLatestWatermarkAlignmentEvent(subtask0, 1044); + assertLatestWatermarkAlignmentEvent(subtask1, 1044); + } + } + + @Test + public void testWatermarkAlignmentWithTwoGroups() throws Exception { + try (AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry()) { + long maxDrift = 1000L; + SourceCoordinator<?, ?> sourceCoordinator1 = + getAndStartNewSourceCoordinator( + new WatermarkAlignmentParams(maxDrift, "group1", Long.MAX_VALUE), + closeableRegistry); + + SourceCoordinator<?, ?> sourceCoordinator2 = + getAndStartNewSourceCoordinator( + new WatermarkAlignmentParams(maxDrift, "group2", Long.MAX_VALUE), + closeableRegistry); + + int subtask0 = 0; + int subtask1 = 1; + reportWatermarkEvent(sourceCoordinator1, subtask0, 42); + assertLatestWatermarkAlignmentEvent(subtask0, 1042); + + reportWatermarkEvent(sourceCoordinator2, subtask1, 44); + assertLatestWatermarkAlignmentEvent(subtask0, 1042); + assertLatestWatermarkAlignmentEvent(subtask1, 1044); + + reportWatermarkEvent(sourceCoordinator1, subtask0, 5000); + assertLatestWatermarkAlignmentEvent(subtask0, 6000); + assertLatestWatermarkAlignmentEvent(subtask1, 1044); + } + } + + protected SourceCoordinator<?, ?> getAndStartNewSourceCoordinator( + WatermarkAlignmentParams watermarkAlignmentParams, + AutoCloseableRegistry closeableRegistry) + throws Exception { + SourceCoordinator<?, ?> sourceCoordinator = + getNewSourceCoordinator(watermarkAlignmentParams); + closeableRegistry.registerCloseable(sourceCoordinator); + sourceCoordinator.start(); + setAllReaderTasksReady(sourceCoordinator); + + return sourceCoordinator; + } + + private void reportWatermarkEvent( + SourceCoordinator<?, ?> sourceCoordinator1, int subtask, long watermark) { + sourceCoordinator1.handleEventFromOperator(subtask, new ReportedWatermarkEvent(watermark)); + waitForCoordinatorToProcessActions(); + sourceCoordinator1.announceCombinedWatermark(); + } + + private void assertLatestWatermarkAlignmentEvent(int subtask, long expectedWatermark) { + List<OperatorEvent> events = receivingTasks.getSentEventsForSubtask(subtask); + assertFalse(events.isEmpty()); + assertEquals(new WatermarkAlignmentEvent(expectedWatermark), events.get(events.size() - 1)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java index eda26fe..bd81efa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl; import org.apache.flink.runtime.operators.coordination.EventReceivingTasks; import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.source.coordinator.SourceCoordinator.WatermarkAlignmentParams; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; @@ -38,8 +39,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -58,7 +59,7 @@ public abstract class SourceCoordinatorTestBase { // ---- Mocks for the Source Coordinator Context ---- protected SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory; - protected ExecutorService coordinatorExecutor; + protected ScheduledExecutorService coordinatorExecutor; protected SplitAssignmentTracker<MockSourceSplit> splitSplitAssignmentTracker; protected SourceCoordinatorContext<MockSourceSplit> context; @@ -79,7 +80,7 @@ public abstract class SourceCoordinatorTestBase { new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory( coordinatorThreadName, getClass().getClassLoader()); - coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory); + coordinatorExecutor = Executors.newScheduledThreadPool(1, coordinatorThreadFactory); sourceCoordinator = getNewSourceCoordinator(); context = sourceCoordinator.getContext(); } @@ -148,6 +149,11 @@ public abstract class SourceCoordinatorTestBase { // ------------------------------------------------------------------------ protected SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> getNewSourceCoordinator() { + return getNewSourceCoordinator(SourceCoordinator.WATERMARK_ALIGNMENT_DISABLED); + } + + protected SourceCoordinator<MockSourceSplit, Set<MockSourceSplit>> getNewSourceCoordinator( + WatermarkAlignmentParams watermarkAlignmentParams) { final Source<Integer, MockSourceSplit, Set<MockSourceSplit>> mockSource = TestingSplitEnumerator.factorySource( new MockSourceSplitSerializer(), @@ -158,7 +164,8 @@ public abstract class SourceCoordinatorTestBase { coordinatorExecutor, mockSource, getNewSourceCoordinatorContext(), - new CoordinatorStoreImpl()); + new CoordinatorStoreImpl(), + watermarkAlignmentParams); } protected SourceCoordinatorContext<MockSourceSplit> getNewSourceCoordinatorContext() {