This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 91903351ad1c8e13bbe2cfe5d8383ad915b5887f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Jun 29 11:00:59 2020 +0200

    [backport] Backport some core changes that were part of the FLIP-27 File 
Source commit
---
 .../SingleThreadMultiplexSourceReaderBase.java     | 63 ++++++++++++++++++----
 .../source/reader/mocks/TestingReaderContext.java  |  4 +-
 .../streaming/api/operators/SourceOperator.java    |  5 +-
 3 files changed, 59 insertions(+), 13 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index ab87db0..f5806d1 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -26,24 +26,67 @@ import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcher
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
+import java.util.Collection;
 import java.util.function.Supplier;
 
 /**
- * A abstract {@link SourceReader} implementation that assign all the splits 
to a single thread to consume.
- * @param <E>
- * @param <T>
- * @param <SplitT>
- * @param <SplitStateT>
+ * A base for {@link SourceReader}s that read splits with one thread using one 
{@link SplitReader}.
+ * The splits can be read either one after the other (like in a file source) 
or concurrently by changing
+ * the subscription in the split reader (like in the Kafka Source).
+ *
+ * <p>To implement a source reader based on this class, implementors need to 
supply the following:
+ * <ul>
+ *   <li>A {@link SplitReader}, which connects to the source and reads/polls 
data. The split reader
+ *       gets notified whenever there is a new split. The split reader would 
read files, contain a
+ *       Kafka or other source client, etc.</li>
+ *   <li>A {@link RecordEmitter} that takes a record from the Split Reader and 
updates the checkpointing
+ *       state and converts it into the final form. For example for Kafka, the 
Record Emitter takes a
+ *       {@code ConsumerRecord}, puts the offset information into state, 
transforms the records with the
+ *       deserializers into the final type, and emits the record.</li>
+ *   <li>The class must override the methods to convert back and forth between 
the immutable splits
+ *       ({@code SplitT}) and the mutable split state representation ({@code 
SplitStateT}).</li>
+ *   <li>Finally, the reader must decide what to do when it starts ({@link 
#start()}) or when a split is
+ *       finished ({@link #onSplitFinished(Collection)}).</li>
+ * </ul>
+ *
+ * @param <E> The type of the records (the raw type that typically contains 
checkpointing information).
+ * @param <T> The final type of the records emitted by the source.
+ * @param <SplitT> The type of the splits processed by the source.
+ * @param <SplitStateT> The type of the mutable state per split.
  */
 public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {
 
+       /**
+        * The primary constructor for the source reader.
+        *
+        * <p>The reader will use a handover queue sized as configured via
+        * {@link SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}.
+        */
+       public SingleThreadMultiplexSourceReaderBase(
+                       Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+                       RecordEmitter<E, T, SplitStateT> recordEmitter,
+                       Configuration config,
+                       SourceReaderContext context) {
+               this(
+                       new 
FutureCompletingBlockingQueue<>(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),
+                       splitReaderSupplier,
+                       recordEmitter,
+                       config,
+                       context);
+       }
+
+       /**
+        * This constructor behaves like
+        * {@link #SingleThreadMultiplexSourceReaderBase(Supplier, 
RecordEmitter, Configuration, SourceReaderContext)},
+        * but accepts a specific {@link FutureCompletingBlockingQueue}.
+        */
        public SingleThreadMultiplexSourceReaderBase(
-               FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
-               Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
-               RecordEmitter<E, T, SplitStateT> recordEmitter,
-               Configuration config,
-               SourceReaderContext context) {
+                       FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                       Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+                       RecordEmitter<E, T, SplitStateT> recordEmitter,
+                       Configuration config,
+                       SourceReaderContext context) {
                super(
                        elementsQueue,
                        new SingleThreadFetcherManager<>(elementsQueue, 
splitReaderSupplier),
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
index 02faf1f..5bb9b59 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
@@ -64,7 +64,9 @@ public class TestingReaderContext implements 
SourceReaderContext {
        }
 
        @Override
-       public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+       public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
+               sentEvents.add(sourceEvent);
+       }
 
        // 
------------------------------------------------------------------------
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 1af572c..05b3216 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -173,11 +173,12 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                        sourceReader.addSplits(splits);
                }
 
-               // Start the reader.
-               sourceReader.start();
                // Register the reader to the coordinator.
                registerReader();
 
+               // Start the reader after registration, sending messages in 
start is allowed.
+               sourceReader.start();
+
                eventTimeLogic.startPeriodicWatermarkEmits();
        }
 

Reply via email to