zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r992198837


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java:
##########
@@ -52,128 +40,19 @@
 @Public
 public class IteratorSourceReader<
                 E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
-        implements SourceReader<E, SplitT> {
-
-    /** The context for this reader, to communicate with the enumerator. */
-    private final SourceReaderContext context;
-
-    /** The availability future. This reader is available as soon as a split 
is assigned. */
-    private CompletableFuture<Void> availability;
-
-    /**
-     * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
-     * non-null always together with the {@link #currentSplit} field.
-     */
-    @Nullable private IterT iterator;
-
-    /**
-     * The split whose data we return. Non-null after a split has been 
assigned. This field is null
-     * or non-null always together with the {@link #iterator} field.
-     */
-    @Nullable private SplitT currentSplit;
-
-    /** The remaining splits that were assigned but not yet processed. */
-    private final Queue<SplitT> remainingSplits;
-
-    private boolean noMoreSplits;
+        extends IteratorSourceReaderBase<E, E, IterT, SplitT> {
 
     public IteratorSourceReader(SourceReaderContext context) {
-        this.context = checkNotNull(context);
-        this.availability = new CompletableFuture<>();
-        this.remainingSplits = new ArrayDeque<>();
-    }
-
-    // ------------------------------------------------------------------------
-
-    @Override
-    public void start() {
-        // request a split if we don't have one
-        if (remainingSplits.isEmpty()) {
-            context.sendSplitRequest();
-        }
-    }
-
-    @Override
-    public InputStatus pollNext(ReaderOutput<E> output) {
-        if (iterator != null) {
-            if (iterator.hasNext()) {
-                output.collect(iterator.next());
-                return InputStatus.MORE_AVAILABLE;
-            } else {
-                finishSplit();
-            }
-        }
-
-        return tryMoveToNextSplit();
-    }
-
-    private void finishSplit() {
-        iterator = null;
-        currentSplit = null;
-
-        // request another split if no other is left
-        // we do this only here in the finishSplit part to avoid requesting a 
split
-        // whenever the reader is polled and doesn't currently have a split
-        if (remainingSplits.isEmpty() && !noMoreSplits) {
-            context.sendSplitRequest();
-        }
-    }
-
-    private InputStatus tryMoveToNextSplit() {
-        currentSplit = remainingSplits.poll();
-        if (currentSplit != null) {
-            iterator = currentSplit.getIterator();
-            return InputStatus.MORE_AVAILABLE;
-        } else if (noMoreSplits) {
-            return InputStatus.END_OF_INPUT;
-        } else {
-            // ensure we are not called in a loop by resetting the 
availability future
-            if (availability.isDone()) {
-                availability = new CompletableFuture<>();
-            }
-
-            return InputStatus.NOTHING_AVAILABLE;
-        }
+        super(context);
     }
 
     @Override
-    public CompletableFuture<Void> isAvailable() {
-        return availability;
-    }
+    protected void start(SourceReaderContext context) {}

Review Comment:
   I was confused; I thought. the IteratorSourceReader was the parent class.
   
   This is fine as is imo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to