[jira] [Commented] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss

2023-10-30 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780899#comment-17780899
 ] 

Leonard Xu commented on FLINK-33360:


Fixed in master(1.19): 8d21c321dda18ad20022532c861d57ee38b70fda

> HybridSource fails to clear the previous round's state when switching 
> sources, leading to data loss
> ---
>
> Key: FLINK-33360
> URL: https://issues.apache.org/jira/browse/FLINK-33360
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Feng Jiajie
>Assignee: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
> {code:java}
>             // track readers that have finished processing for current 
> enumerator
>             finishedReaders.add(subtaskId);
>             if (finishedReaders.size() == context.currentParallelism()) {
>                 LOG.debug("All readers finished, ready to switch 
> enumerator!");
>                 if (currentSourceIndex + 1 < sources.size()) {
>                     switchEnumerator();
>                     // switch all readers prior to sending split assignments
>                     for (int i = 0; i < context.currentParallelism(); i++) {
>                         sendSwitchSourceEvent(i, currentSourceIndex);
>                     }
>                 }
>             } {code}
> I think that *finishedReaders* is used to keep track of all the subTaskIds 
> that have finished reading the current round of the source. Therefore, in the 
> *switchEnumerator* function, *finishedReaders* should be cleared:
> If it's not cleared, then in the next source reading, whenever any 
> SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders 
> may not have finished processing in parallel), the condition 
> *finishedReaders.size() == context.currentParallelism()* will be satisfied 
> and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), 
> sending a *SwitchSourceEvent* to all SourceReaders.
> If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
> previous source, it will execute {*}currentReader.close(){*}, and some data 
> may not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss

2023-10-26 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779998#comment-17779998
 ] 

Leonard Xu commented on FLINK-33360:


Thanks [~fengjiajie] for report this issue, I assigned this ticket to you as 
you have raised a PR.

> HybridSource fails to clear the previous round's state when switching 
> sources, leading to data loss
> ---
>
> Key: FLINK-33360
> URL: https://issues.apache.org/jira/browse/FLINK-33360
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.18.1
>
>
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
> {code:java}
>             // track readers that have finished processing for current 
> enumerator
>             finishedReaders.add(subtaskId);
>             if (finishedReaders.size() == context.currentParallelism()) {
>                 LOG.debug("All readers finished, ready to switch 
> enumerator!");
>                 if (currentSourceIndex + 1 < sources.size()) {
>                     switchEnumerator();
>                     // switch all readers prior to sending split assignments
>                     for (int i = 0; i < context.currentParallelism(); i++) {
>                         sendSwitchSourceEvent(i, currentSourceIndex);
>                     }
>                 }
>             } {code}
> I think that *finishedReaders* is used to keep track of all the subTaskIds 
> that have finished reading the current round of the source. Therefore, in the 
> *switchEnumerator* function, *finishedReaders* should be cleared:
> If it's not cleared, then in the next source reading, whenever any 
> SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders 
> may not have finished processing in parallel), the condition 
> *finishedReaders.size() == context.currentParallelism()* will be satisfied 
> and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), 
> sending a *SwitchSourceEvent* to all SourceReaders.
> If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
> previous source, it will execute {*}currentReader.close(){*}, and some data 
> may not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss

2023-10-25 Thread Feng Jiajie (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779507#comment-17779507
 ] 

Feng Jiajie commented on FLINK-33360:
-

pr: https://github.com/apache/flink/pull/23593

> HybridSource fails to clear the previous round's state when switching 
> sources, leading to data loss
> ---
>
> Key: FLINK-33360
> URL: https://issues.apache.org/jira/browse/FLINK-33360
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HybridSource
>Affects Versions: 1.16.2, 1.17.1
>Reporter: Feng Jiajie
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3
>
>
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator:
> {code:java}
>             // track readers that have finished processing for current 
> enumerator
>             finishedReaders.add(subtaskId);
>             if (finishedReaders.size() == context.currentParallelism()) {
>                 LOG.debug("All readers finished, ready to switch 
> enumerator!");
>                 if (currentSourceIndex + 1 < sources.size()) {
>                     switchEnumerator();
>                     // switch all readers prior to sending split assignments
>                     for (int i = 0; i < context.currentParallelism(); i++) {
>                         sendSwitchSourceEvent(i, currentSourceIndex);
>                     }
>                 }
>             } {code}
> I think that *finishedReaders* is used to keep track of all the subTaskIds 
> that have finished reading the current round of the source. Therefore, in the 
> *switchEnumerator* function, *finishedReaders* should be cleared:
> If it's not cleared, then in the next source reading, whenever any 
> SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders 
> may not have finished processing in parallel), the condition 
> *finishedReaders.size() == context.currentParallelism()* will be satisfied 
> and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), 
> sending a *SwitchSourceEvent* to all SourceReaders.
> If a SourceReader receives a SwitchSourceEvent before it finishes reading the 
> previous source, it will execute {*}currentReader.close(){*}, and some data 
> may not be fully read, resulting in a partial data loss in the source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)