This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 36498a0b767a53c0782cc4fafeb62bdb3211b0b4 Author: Stephan Ewen <[email protected]> AuthorDate: Wed Nov 4 22:22:30 2020 +0100 [FLINK-19265][core] Add to source coordinator built-in methods to signal "no more splits". This replaces custom events and event handling implemented by many sources. --- .../base/source/reader/SourceReaderBase.java | 17 ++++++++-------- .../base/source/reader/SourceReaderBaseTest.java | 3 +-- .../source/reader/mocks/MockSplitEnumerator.java | 3 +-- .../file/src/impl/StaticFileSplitEnumerator.java | 3 +-- .../file/src/FileSourceHeavyThroughputTest.java | 3 +-- .../source/enumerator/KafkaSourceEnumerator.java | 3 +-- .../flink/api/connector/source/SourceReader.java | 14 ++++++++++++- .../connector/source/SplitEnumeratorContext.java | 8 ++++++++ .../source/lib/util/IteratorSourceEnumerator.java | 3 +-- .../source/lib/util/IteratorSourceReader.java | 23 ++++++++++------------ .../connector/source/mocks/MockSourceReader.java | 20 ++++++++----------- .../source/mocks/MockSplitEnumeratorContext.java | 3 +++ .../coordinator/SourceCoordinatorContext.java | 14 +++++++++++++ .../runtime}/source/event/NoMoreSplitsEvent.java | 6 +++--- .../streaming/api/operators/SourceOperator.java | 3 +++ .../source/SourceOperatorEventTimeTest.java | 3 +-- .../runtime/tasks/MultipleInputStreamTaskTest.java | 6 +++--- .../checkpointing/UnalignedCheckpointITCase.java | 3 +-- 18 files changed, 81 insertions(+), 57 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 5b4d6f7..b8dc9ef 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -24,7 +24,6 @@ import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; @@ -220,13 +219,15 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt } @Override + public void notifyNoMoreSplits() { + LOG.info("Reader received NoMoreSplits event."); + noMoreSplitsAssignment = true; + elementsQueue.notifyAvailable(); + } + + @Override public void handleSourceEvents(SourceEvent sourceEvent) { - LOG.trace("Handling source event: {}", sourceEvent); - if (sourceEvent instanceof NoMoreSplitsEvent) { - LOG.info("Reader received NoMoreSplits event."); - noMoreSplitsAssignment = true; - elementsQueue.notifyAvailable(); - } + LOG.info("Received unhandled source event: {}", sourceEvent); } @Override @@ -235,8 +236,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt splitFetcherManager.close(options.sourceReaderCloseTimeout); } - - // -------------------- Abstract method to allow different implementations ------------------ /** * Handles the finished splits to clean the state if needed. diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index ff38f07..5c8d9d3 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -20,7 +20,6 @@ package org.apache.flink.connector.base.source.reader; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SourceReader; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.TestingReaderContext; import org.apache.flink.api.connector.source.mocks.TestingReaderOutput; @@ -90,7 +89,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> reader.addSplits(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED))); - reader.handleSourceEvents(new NoMoreSplitsEvent()); + reader.notifyNoMoreSplits(); // This is not a real infinite loop, it is supposed to throw exception after two polls. while (true) { InputStatus inputStatus = reader.pollNext(output); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java index 8df3394..77da1ba 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java @@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import java.io.IOException; @@ -73,7 +72,7 @@ public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Lis context.assignSplits(new SplitsAssignment<>(assignment)); splits.clear(); for (int i = 0; i < numReaders; i++) { - context.sendEventToSourceReader(i, new NoMoreSplitsEvent()); + context.signalNoMoreSplits(i); } } } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java index f194dc4..de10682 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.event.RequestSplitEvent; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.FileSourceSplit; @@ -121,7 +120,7 @@ public class StaticFileSplitEnumerator implements SplitEnumerator<FileSourceSpli LOG.info("Assigned split to subtask {} : {}", subtask, split); } else { - context.sendEventToSourceReader(subtask, new NoMoreSplitsEvent()); + context.signalNoMoreSplits(subtask); LOG.info("No more splits available for subtask {}", subtask); } } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java index b54c7f3..a789605 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java @@ -26,7 +26,6 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.file.src.reader.SimpleStreamFormat; import org.apache.flink.connector.file.src.reader.StreamFormat; @@ -81,7 +80,7 @@ public class FileSourceHeavyThroughputTest { final FileSource<byte[]> source = FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build(); final SourceReader<byte[], FileSourceSplit> reader = source.createReader(new NoOpReaderContext()); reader.addSplits(Collections.singletonList(split)); - reader.handleSourceEvents(new NoMoreSplitsEvent()); + reader.notifyNoMoreSplits(); final ReaderOutput<byte[]> out = new NoOpReaderOutput<>(); diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 02164e2..b3c6888 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -24,7 +24,6 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; @@ -242,7 +241,7 @@ public class KafkaSourceEnumerator implements SplitEnumerator<KafkaPartitionSpli if (noMoreNewPartitionSplits) { LOG.debug("No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to the readers " + "in consumer group {}.", consumerGroupId); - context.sendEventToSourceReader(readerOwner, new NoMoreSplitsEvent()); + context.signalNoMoreSplits(readerOwner); } }); } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java index 0d37689..1258357 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java @@ -94,13 +94,25 @@ public interface SourceReader<T, SplitT extends SourceSplit> void addSplits(List<SplitT> splits); /** + * This method is called when the reader is notified that it will not + * receive any further splits. + * + * <p>It is triggered when the enumerator calls {@link SplitEnumeratorContext#signalNoMoreSplits(int)} + * with the reader's parallel subtask. + */ + void notifyNoMoreSplits(); + + /** * Handle a custom source event sent by the {@link SplitEnumerator}. * This method is called when the enumerator sends an event via * {@link SplitEnumeratorContext#sendEventToSourceReader(int, SourceEvent)}. * + * <p>This method has a default implementation that does nothing, because + * most sources do not require any custom events. + * * @param sourceEvent the event sent by the {@link SplitEnumerator}. */ - void handleSourceEvents(SourceEvent sourceEvent); + default void handleSourceEvents(SourceEvent sourceEvent) {} /** * We have an empty default implementation here because most source readers do not have diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java index 535501a..efd3627 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java @@ -84,6 +84,14 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> { } /** + * Signals a subtask that it will not receive any further split. + * + * @param subtask The index of the operator's parallel subtask that shall be + * signaled it will not receive any further split. + */ + void signalNoMoreSplits(int subtask); + + /** * Invoke the callable and handover the return value to the handler which will be executed * by the source coordinator. When this method is invoked multiple times, The <code>Coallble</code>s * may be executed in a thread pool concurrently. diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java index 91bc283..61e806d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java @@ -21,7 +21,6 @@ package org.apache.flink.api.connector.source.lib.util; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.event.RequestSplitEvent; import org.apache.flink.util.FlinkRuntimeException; @@ -66,7 +65,7 @@ public class IteratorSourceEnumerator<SplitT extends IteratorSourceSplit<?, ?>> if (nextSplit != null) { context.assignSplit(nextSplit, subtaskId); } else { - context.sendEventToSourceReader(subtaskId, new NoMoreSplitsEvent()); + context.signalNoMoreSplits(subtaskId); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java index 3af75e1..5c6631e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java @@ -19,13 +19,10 @@ package org.apache.flink.api.connector.source.lib.util; import org.apache.flink.api.connector.source.ReaderOutput; -import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.event.RequestSplitEvent; import org.apache.flink.core.io.InputStatus; -import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.Nullable; @@ -122,6 +119,16 @@ public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends I } @Override + public void notifyNoMoreSplits() { + // if we get this after we already had a split, we must have requested more than + // one split, which is not expected here. + checkState(remainingSplits == null, "Unexpected response, requested more than one split."); + + // non-null queue signals splits were assigned, in this case no splits + remainingSplits = new ArrayDeque<>(); + } + + @Override public List<SplitT> snapshotState(long checkpointId) { final ArrayList<SplitT> allSplits = new ArrayList<>(1 + remainingSplits.size()); if (iterator != null) { @@ -134,15 +141,5 @@ public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends I } @Override - public void handleSourceEvents(SourceEvent sourceEvent) { - if (sourceEvent instanceof NoMoreSplitsEvent) { - // non-null queue signals splits were assigned, in this case no splits - remainingSplits = new ArrayDeque<>(); - } else { - throw new FlinkRuntimeException("Unexpected event: " + sourceEvent); - } - } - - @Override public void close() throws Exception {} } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index b0a24dd..8eb4596 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -108,12 +108,9 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> } @Override - public void handleSourceEvents(SourceEvent sourceEvent) { - if (sourceEvent instanceof MockNoMoreSplitsEvent) { - waitingForMoreSplits = false; - markAvailable(); - } - receivedSourceEvents.add(sourceEvent); + public void notifyNoMoreSplits() { + waitingForMoreSplits = false; + markAvailable(); } @Override @@ -137,6 +134,11 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> } } + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + receivedSourceEvents.add(sourceEvent); + } + // --------------- methods for unit tests --------------- public void markAvailable() { @@ -174,10 +176,4 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> public List<Long> getAbortedCheckpoints() { return abortedCheckpoints; } - - /** - * Simple event allowing {@link MockSourceReader} to finish when requested. - */ - public static class MockNoMoreSplitsEvent implements SourceEvent { - } } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java index 6b2ba73..19f9ccc 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java @@ -113,6 +113,9 @@ public class MockSplitEnumeratorContext<SplitT extends SourceSplit> implements S } @Override + public void signalNoMoreSplits(int subtask) {} + + @Override public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) { if (stoppedAcceptAsyncCalls.get()) { return; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 50b50bb..b8e78db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.util.FlinkRuntimeException; @@ -183,6 +184,19 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> } @Override + public void signalNoMoreSplits(int subtask) { + // Ensure the split assignment is done by the the coordinator executor. + callInCoordinatorThread(() -> { + try { + operatorCoordinatorContext.sendEvent(new NoMoreSplitsEvent(), subtask); + return null; // void return value + } catch (TaskNotRunningException e) { + throw new FlinkRuntimeException("Failed to send 'NoMoreSplits' to reader " + subtask, e); + } + }, "Failed to send 'NoMoreSplits' to reader " + subtask); + } + + @Override public <T> void callAsync( Callable<T> callable, BiConsumer<T, Throwable> handler, diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java similarity index 87% rename from flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java index aa26808..18d8433 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java @@ -16,16 +16,16 @@ * limitations under the License. */ -package org.apache.flink.api.connector.source.event; +package org.apache.flink.runtime.source.event; -import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; /** * A source event sent from the SplitEnumerator to the SourceReader to indicate that no more * splits will be assigned to the source reader anymore. So once the SplitReader finishes * reading the currently assigned splits, they can exit. */ -public class NoMoreSplitsEvent implements SourceEvent { +public class NoMoreSplitsEvent implements OperatorEvent { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 5ffb930..80043cf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.runtime.state.StateInitializationContext; @@ -277,6 +278,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> } } else if (event instanceof SourceEventWrapper) { sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent()); + } else if (event instanceof NoMoreSplitsEvent) { + sourceReader.notifyNoMoreSplits(); } else { throw new IllegalStateException("Received unexpected operator event " + event); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java index d60934c..925c498 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.connector.source.ReaderOutput; -import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.core.fs.CloseableRegistry; @@ -293,7 +292,7 @@ public class SourceOperatorEventTimeTest { public void addSplits(List<MockSourceSplit> splits) {} @Override - public void handleSourceEvents(SourceEvent sourceEvent) {} + public void notifyNoMoreSplits() {} @Override public void close() {} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index ecaeb89..f3b963a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -29,7 +29,6 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.mocks.MockSource; import org.apache.flink.api.connector.source.mocks.MockSourceReader; -import org.apache.flink.api.connector.source.mocks.MockSourceReader.MockNoMoreSplitsEvent; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.metrics.Counter; @@ -48,7 +47,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup; import org.apache.flink.runtime.source.event.AddSplitEvent; -import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractInput; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -98,6 +97,7 @@ import static org.junit.Assert.assertTrue; * Tests for {@link MultipleInputStreamTask}. Theses tests implicitly also test the * {@link StreamMultipleInputProcessor}. */ +@SuppressWarnings("serial") public class MultipleInputStreamTaskTest { private static final List<String> LIFE_CYCLE_EVENTS = new ArrayList<>(); @@ -869,7 +869,7 @@ public class MultipleInputStreamTaskTest { private void finishAddingRecords(StreamTaskMailboxTestHarness<String> testHarness, int sourceId) throws Exception { testHarness.getStreamTask().dispatchOperatorEvent( getSourceOperatorID(testHarness, sourceId), - new SerializedValue<>(new SourceEventWrapper(new MockNoMoreSplitsEvent()))); + new SerializedValue<>(new NoMoreSplitsEvent())); } static class LifeCycleTrackingMapToStringMultipleInputOperator diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index 8741478..f2a9fa9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -328,8 +328,7 @@ public class UnalignedCheckpointITCase extends TestLogger { } @Override - public void handleSourceEvents(SourceEvent sourceEvent) { - } + public void notifyNoMoreSplits() {} @Override public void close() throws Exception {
