This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 78236cb  [hotfix] Fix the SingleThreadFetcherManager to get the 
running fetchers correctly.
78236cb is described below

commit 78236cb12a78ef128a55ce14a0eb5eefb73cc735
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Fri Nov 20 00:24:10 2020 +0800

    [hotfix] Fix the SingleThreadFetcherManager to get the running fetchers 
correctly.
---
 .../base/source/reader/fetcher/SingleThreadFetcherManager.java      | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index de50e6a..8c22bf5 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -54,7 +54,7 @@ public class SingleThreadFetcherManager<E, SplitT extends 
SourceSplit>
 
        @Override
        public void addSplits(List<SplitT> splitsToAdd) {
-               SplitFetcher<E, SplitT> fetcher = fetchers.get(0);
+               SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
                if (fetcher == null) {
                        fetcher = createSplitFetcher();
                        // Add the splits to the fetchers.
@@ -64,4 +64,8 @@ public class SingleThreadFetcherManager<E, SplitT extends 
SourceSplit>
                        fetcher.addSplits(splitsToAdd);
                }
        }
+
+       protected SplitFetcher<E, SplitT> getRunningFetcher() {
+               return fetchers.isEmpty() ? null : 
fetchers.values().iterator().next();
+       }
 }

Reply via email to