[ 
https://issues.apache.org/jira/browse/FLINK-30334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644683#comment-17644683
 ] 

Ran Tao commented on FLINK-30334:
---------------------------------

[~martijnvisser] yes, my second source is bounded csv. But HybridSource second 
source no need to must be unbounded.  if second source is bounded hybridsource 
will return bounded. if second source is unbounded it will return unbounded.  
hybrid source can work in batch and streaming. In batch mode, it will read 2 
sources and return.


And it doesn't matter, i change second source to be streaming data source, it 
cause same bug too. The important key point is current child source do not run 
now (block at split request).

> SourceCoordinator error splitRequest check cause HybridSource loss of data 
> and hang
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-30334
>                 URL: https://issues.apache.org/jira/browse/FLINK-30334
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.16.0, 1.17.0
>            Reporter: Ran Tao
>            Priority: Critical
>              Labels: pull-request-available
>
> If we use hybrid source, for example, filesystem source A read a.csv, 
> filesystem B read b.csv. It's a very simple case, but it will hang in second 
> source with:
> 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
> Source: hybrid_source[1] received split request from parallel task 0 (#0)
> 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - 
> Subtask 0 (on host '') is requesting a file source split
> 10803 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - 
> Assigning split to non-localized request: Optional[FileSourceSplit: 
> [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info) 
> ID=0000000001 position=null]
> 10808 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - 
> Assigned split to subtask 0 : FileSourceSplit: 
> [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info) 
> ID=0000000001 position=null
> 10816 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Adding 
> splits subtask=0 sourceIndex=0 
> currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
>  [HybridSourceSplit
> {sourceIndex=0, splitId=0000000001}
> ]
> 10817 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: [FileSourceSplit: 
> [file:/Users/xxx/a.csv|file:///Users/chucheng/TMP/a.csv] [0, 49) (no host 
> info) ID=0000000001 position=null]
> 10822 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] 
> (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Starting split fetcher 0
> 10864 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] 
> (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished reading from splits [0000000001]
> +I[hello_a, flink, 1]
> +I[hello_a, hadoop, 2]
> +I[hello_a, world, 3]
> 10866 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
> reading split(s) [0000000001]
> 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] 
> - Closing splitFetcher 0 because it is idle.
> 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Shutting down split fetcher 0
> 10868 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] 
> (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
> fetcher 0 exited.
> 10869 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
> Source: hybrid_source[1] received split request from parallel task 0 (#0)
> 10870 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - 
> Subtask 0 (on host '') is requesting a file source split
> 10872 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No 
> more splits available for subtask 0
> 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader 
> received NoMoreSplits event.
> 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - End of 
> input subtask=0 sourceIndex=0 
> org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
> StaticFileSplitEnumerator:org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator@69906bb9
> 10874 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator [] 
> - Starting enumerator for sourceIndex=1
> 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Switch 
> source event: subtask=0 sourceIndex=1 
> source=org.apache.flink.connector.file.src.FileSource@12ef574f
> 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
> Source Reader.
> 10882 [SourceCoordinator-Source: hybrid_source[1]] INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
> Source: hybrid_source[1] received split request from parallel task 0 (#0)
> *do not read next source data and hang at 'received split request from 
> parallel task'*
>  
> The reason is that in the 1.16 & master, latest code add 
> context.hasNoMoreSplits check then call enumerator.handleSplitRequest.  We do 
> understand the comments for reducing the call splits. But it not consider the 
> the situation about HybridSource. When a subtask hasNoMoreSplits, it will 
> switch to next source. But here just set a check without this situation. When 
> first source read finish, the context just let this subtask with noMoreSplit 
> Status. And the later check can't assign splits with next sources. However , 
> the flink 1.15 is correct.
>  
> *SourceCoordinator*
>  
> {code:java}
> private void handleRequestSplitEvent(int subtask, int attemptNumber, 
> RequestSplitEvent event) {
>     LOG.info(
>             "Source {} received split request from parallel task {} (#{})",
>             operatorName,
>             subtask,
>             attemptNumber);
>     // request splits from the enumerator only if the enumerator has 
> un-assigned splits
>     // this helps to reduce unnecessary split requests to the enumerator
>     if (!context.hasNoMoreSplits(subtask)) {
>         enumerator.handleSplitRequest(subtask, event.hostName());
>     }
> } {code}
> SourceCoordinator call `context.hasNoMoreSplits` check cause the subtask not 
> read the other child sources in hybrid source.
>  
> SourceCoordinatorContext
>  
> {code:java}
> boolean hasNoMoreSplits(int subtaskIndex) { return 
> subtaskHasNoMoreSplits[subtaskIndex]; }
> @Override
> public void signalNoMoreSplits(int subtask) {
>     checkSubtaskIndex(subtask);
>     // Ensure the split assignment is done by the coordinator executor.
>     callInCoordinatorThread(
>             () -> {
>                 subtaskHasNoMoreSplits[subtask] = true;
>                 signalNoMoreSplitsToAttempts(subtask);
>                 return null; // void return value
>             },
>             "Failed to send 'NoMoreSplits' to reader " + subtask);
> }
> {code}
> context set subtask noMoreSplit is true if source is done (without 
> considering the hybrid situation).
>  
>  
> 1.15
>  
> {code:java}
> public void handleEventFromOperator(int subtask, OperatorEvent event) {
>     runInEventLoop(
>             () -> {
>                 if (event instanceof RequestSplitEvent) {
>                     LOG.info(
>                             "Source {} received split request from parallel 
> task {}",
>                             operatorName,
>                             subtask);
>                     enumerator.handleSplitRequest(
>                             subtask, ((RequestSplitEvent) event).hostName());
>                 }  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to