This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 78236cb [hotfix] Fix the SingleThreadFetcherManager to get the
running fetchers correctly.
78236cb is described below
commit 78236cb12a78ef128a55ce14a0eb5eefb73cc735
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Fri Nov 20 00:24:10 2020 +0800
[hotfix] Fix the SingleThreadFetcherManager to get the running fetchers
correctly.
---
.../base/source/reader/fetcher/SingleThreadFetcherManager.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index de50e6a..8c22bf5 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -54,7 +54,7 @@ public class SingleThreadFetcherManager<E, SplitT extends
SourceSplit>
@Override
public void addSplits(List<SplitT> splitsToAdd) {
- SplitFetcher<E, SplitT> fetcher = fetchers.get(0);
+ SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
if (fetcher == null) {
fetcher = createSplitFetcher();
// Add the splits to the fetchers.
@@ -64,4 +64,8 @@ public class SingleThreadFetcherManager<E, SplitT extends
SourceSplit>
fetcher.addSplits(splitsToAdd);
}
}
+
+ protected SplitFetcher<E, SplitT> getRunningFetcher() {
+ return fetchers.isEmpty() ? null :
fetchers.values().iterator().next();
+ }
}