Ran Tao created FLINK-30334:
-------------------------------

             Summary: SourceCoordinator error split check cause HybridSource 
hang
                 Key: FLINK-30334
                 URL: https://issues.apache.org/jira/browse/FLINK-30334
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.16.0, 1.17.0
            Reporter: Ran Tao


If we use hybrid source, for example, filesystem source A read a.csv, 
filesystem B read b.csv. It's a very simple case, but it will hang in second 
source with:

10802 [SourceCoordinator-Source: hybrid_source[1]] INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
Source: hybrid_source[1] received split request from parallel task 0 (#0)
10802 [SourceCoordinator-Source: hybrid_source[1]] INFO  
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 
0 (on host '') is requesting a file source split
10803 [SourceCoordinator-Source: hybrid_source[1]] INFO  
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - 
Assigning split to non-localized request: Optional[FileSourceSplit: 
file:/Users/xxx/a.csv [0, 49) (no host info) ID=0000000001 position=null]
10808 [SourceCoordinator-Source: hybrid_source[1]] INFO  
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - 
Assigned split to subtask 0 : FileSourceSplit: file:/Users/xxx/a.csv [0, 49) 
(no host info) ID=0000000001 position=null
10816 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Adding 
splits subtask=0 sourceIndex=0 
currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
 [HybridSourceSplit\{sourceIndex=0, splitId=0000000001}]
10817 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [FileSourceSplit: file:/Users/chucheng/TMP/a.csv [0, 49) 
(no host info) ID=0000000001 position=null]
10822 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] 
(1/1)#0] INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Starting split fetcher 0
10864 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] 
(1/1)#0] INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished reading from splits [0000000001]
+I[hello_a, flink, 1]
+I[hello_a, hadoop, 2]
+I[hello_a, world, 3]
10866 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
reading split(s) [0000000001]
10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - 
Closing splitFetcher 0 because it is idle.
10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Shutting down split fetcher 0
10868 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] 
(1/1)#0] INFO  
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
fetcher 0 exited.
10869 [SourceCoordinator-Source: hybrid_source[1]] INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
Source: hybrid_source[1] received split request from parallel task 0 (#0)
10870 [SourceCoordinator-Source: hybrid_source[1]] INFO  
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 
0 (on host '') is requesting a file source split
10872 [SourceCoordinator-Source: hybrid_source[1]] INFO  
org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No more 
splits available for subtask 0
10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader 
received NoMoreSplits event.
10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - End of 
input subtask=0 sourceIndex=0 
org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971
StaticFileSplitEnumerator:org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator@69906bb9
10874 [SourceCoordinator-Source: hybrid_source[1]] INFO  
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator [] - 
Starting enumerator for sourceIndex=1
10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Switch 
source event: subtask=0 sourceIndex=1 
source=org.apache.flink.connector.file.src.FileSource@12ef574f
10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
Source Reader.
10882 [SourceCoordinator-Source: hybrid_source[1]] INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source 
Source: hybrid_source[1] received split request from parallel task 0 (#0)

*do not read next source data and hang at 'received split request from parallel 
task'*

 

The reason is that in the 1.16 & master, latest code add 
context.hasNoMoreSplits check than call enumerator.handleSplitRequest.  We do 
understand the comments for reducing the call splits. But it not consider the 
the situation about HybridSource. When a subtask hasNoMoreSplits, it will 
switch to next source. But here just set a check without this situation. When 
first source read finish, the context just let this subtask with noMoreSplit 
Status. And the later check can't assign splits with next sources. However , 
the flink 1.15 is correct.

 

*SourceCoordinator*

 
{code:java}
private void handleRequestSplitEvent(int subtask, int attemptNumber, 
RequestSplitEvent event) {
    LOG.info(
            "Source {} received split request from parallel task {} (#{})",
            operatorName,
            subtask,
            attemptNumber);
    // request splits from the enumerator only if the enumerator has 
un-assigned splits
    // this helps to reduce unnecessary split requests to the enumerator
    if (!context.hasNoMoreSplits(subtask)) {
        enumerator.handleSplitRequest(subtask, event.hostName());
    }
} {code}
SourceCoordinator call `context.hasNoMoreSplits` check cause the subtask not 
deal with hybrid the other child sources.

 

SourceCoordinatorContext

 
{code:java}
boolean hasNoMoreSplits(int subtaskIndex) { return 
subtaskHasNoMoreSplits[subtaskIndex]; }

@Override public void signalNoMoreSplits(int subtask) { 
checkSubtaskIndex(subtask); // Ensure the split assignment is done by the 
coordinator executor. callInCoordinatorThread( () -> { 
subtaskHasNoMoreSplits[subtask] = true; signalNoMoreSplitsToAttempts(subtask); 
return null; // void return value }, "Failed to send 'NoMoreSplits' to reader " 
+ subtask); }   {code}
context set noreMoreSplit true if source is done (without considering the 
hybrid situation).

 

 

1.15

 
{code:java}
public void handleEventFromOperator(int subtask, OperatorEvent event) {
    runInEventLoop(
            () -> {
                if (event instanceof RequestSplitEvent) {
                    LOG.info(
                            "Source {} received split request from parallel 
task {}",
                            operatorName,
                            subtask);
                    enumerator.handleSplitRequest(
                            subtask, ((RequestSplitEvent) event).hostName());
                }  {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to