This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 91903351ad1c8e13bbe2cfe5d8383ad915b5887f Author: Stephan Ewen <se...@apache.org> AuthorDate: Mon Jun 29 11:00:59 2020 +0200 [backport] Backport some core changes that were part of the FLIP-27 File Source commit --- .../SingleThreadMultiplexSourceReaderBase.java | 63 ++++++++++++++++++---- .../source/reader/mocks/TestingReaderContext.java | 4 +- .../streaming/api/operators/SourceOperator.java | 5 +- 3 files changed, 59 insertions(+), 13 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java index ab87db0..f5806d1 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java @@ -26,24 +26,67 @@ import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcher import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import java.util.Collection; import java.util.function.Supplier; /** - * A abstract {@link SourceReader} implementation that assign all the splits to a single thread to consume. - * @param <E> - * @param <T> - * @param <SplitT> - * @param <SplitStateT> + * A base for {@link SourceReader}s that read splits with one thread using one {@link SplitReader}. + * The splits can be read either one after the other (like in a file source) or concurrently by changing + * the subscription in the split reader (like in the Kafka Source). + * + * <p>To implement a source reader based on this class, implementors need to supply the following: + * <ul> + * <li>A {@link SplitReader}, which connects to the source and reads/polls data. The split reader + * gets notified whenever there is a new split. The split reader would read files, contain a + * Kafka or other source client, etc.</li> + * <li>A {@link RecordEmitter} that takes a record from the Split Reader and updates the checkpointing + * state and converts it into the final form. For example for Kafka, the Record Emitter takes a + * {@code ConsumerRecord}, puts the offset information into state, transforms the records with the + * deserializers into the final type, and emits the record.</li> + * <li>The class must override the methods to convert back and forth between the immutable splits + * ({@code SplitT}) and the mutable split state representation ({@code SplitStateT}).</li> + * <li>Finally, the reader must decide what to do when it starts ({@link #start()}) or when a split is + * finished ({@link #onSplitFinished(Collection)}).</li> + * </ul> + * + * @param <E> The type of the records (the raw type that typically contains checkpointing information). + * @param <T> The final type of the records emitted by the source. + * @param <SplitT> The type of the splits processed by the source. + * @param <SplitStateT> The type of the mutable state per split. */ public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { + /** + * The primary constructor for the source reader. + * + * <p>The reader will use a handover queue sized as configured via + * {@link SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}. + */ + public SingleThreadMultiplexSourceReaderBase( + Supplier<SplitReader<E, SplitT>> splitReaderSupplier, + RecordEmitter<E, T, SplitStateT> recordEmitter, + Configuration config, + SourceReaderContext context) { + this( + new FutureCompletingBlockingQueue<>(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)), + splitReaderSupplier, + recordEmitter, + config, + context); + } + + /** + * This constructor behaves like + * {@link #SingleThreadMultiplexSourceReaderBase(Supplier, RecordEmitter, Configuration, SourceReaderContext)}, + * but accepts a specific {@link FutureCompletingBlockingQueue}. + */ public SingleThreadMultiplexSourceReaderBase( - FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, - Supplier<SplitReader<E, SplitT>> splitReaderSupplier, - RecordEmitter<E, T, SplitStateT> recordEmitter, - Configuration config, - SourceReaderContext context) { + FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + Supplier<SplitReader<E, SplitT>> splitReaderSupplier, + RecordEmitter<E, T, SplitStateT> recordEmitter, + Configuration config, + SourceReaderContext context) { super( elementsQueue, new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier), diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java index 02faf1f..5bb9b59 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java @@ -64,7 +64,9 @@ public class TestingReaderContext implements SourceReaderContext { } @Override - public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {} + public void sendSourceEventToCoordinator(SourceEvent sourceEvent) { + sentEvents.add(sourceEvent); + } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 1af572c..05b3216 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -173,11 +173,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> sourceReader.addSplits(splits); } - // Start the reader. - sourceReader.start(); // Register the reader to the coordinator. registerReader(); + // Start the reader after registration, sending messages in start is allowed. + sourceReader.start(); + eventTimeLogic.startPeriodicWatermarkEmits(); }