[ 
https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-33360:
--------------------------------
    Affects Version/s: 1.18.0

> HybridSource fails to clear the previous round's state when switching 
> sources, leading to data loss
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33360
>                 URL: https://issues.apache.org/jira/browse/FLINK-33360
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HybridSource
>    Affects Versions: 1.16.2, 1.18.0, 1.17.1
>            Reporter: Feng Jiajie
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.3
>
>
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
> {code:java}
>             // track readers that have finished processing for current 
> enumerator
>             finishedReaders.add(subtaskId);
>             if (finishedReaders.size() == context.currentParallelism()) {
>                 LOG.debug("All readers finished, ready to switch 
> enumerator!");
>                 if (currentSourceIndex + 1 < sources.size()) {
>                     switchEnumerator();
>                     // switch all readers prior to sending split assignments
>                     for (int i = 0; i < context.currentParallelism(); i++) {
>                         sendSwitchSourceEvent(i, currentSourceIndex);
>                     }
>                 }
>             } {code}
> I think that *finishedReaders* is used to keep track of all the subTaskIds 
> that have finished reading the current round of the source. Therefore, in the 
> *switchEnumerator* function, *finishedReaders* should be cleared:
> If it's not cleared, then in the next source reading, whenever any 
> SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders 
> may not have finished processing in parallel), the condition 
> *finishedReaders.size() == context.currentParallelism()* will be satisfied 
> and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), 
> sending a *SwitchSourceEvent* to all SourceReaders.
> If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
> previous source, it will execute {*}currentReader.close(){*}, and some data 
> may not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to