[ https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Leonard Xu reassigned FLINK-33360: ---------------------------------- Assignee: Feng Jiajie > HybridSource fails to clear the previous round's state when switching > sources, leading to data loss > --------------------------------------------------------------------------------------------------- > > Key: FLINK-33360 > URL: https://issues.apache.org/jira/browse/FLINK-33360 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource > Affects Versions: 1.16.2, 1.18.0, 1.17.1 > Reporter: Feng Jiajie > Assignee: Feng Jiajie > Priority: Major > Labels: pull-request-available > Fix For: 1.7.3, 1.18.1 > > > org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator: > {code:java} > // track readers that have finished processing for current > enumerator > finishedReaders.add(subtaskId); > if (finishedReaders.size() == context.currentParallelism()) { > LOG.debug("All readers finished, ready to switch > enumerator!"); > if (currentSourceIndex + 1 < sources.size()) { > switchEnumerator(); > // switch all readers prior to sending split assignments > for (int i = 0; i < context.currentParallelism(); i++) { > sendSwitchSourceEvent(i, currentSourceIndex); > } > } > } {code} > I think that *finishedReaders* is used to keep track of all the subTaskIds > that have finished reading the current round of the source. Therefore, in the > *switchEnumerator* function, *finishedReaders* should be cleared: > If it's not cleared, then in the next source reading, whenever any > SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders > may not have finished processing in parallel), the condition > *finishedReaders.size() == context.currentParallelism()* will be satisfied > and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), > sending a *SwitchSourceEvent* to all SourceReaders. > If a SourceReader receives a SwitchSourceEvent before it finishes reading the > previous source, it will execute {*}currentReader.close(){*}, and some data > may not be fully read, resulting in a partial data loss in the source. -- This message was sent by Atlassian Jira (v8.20.10#820010)