[ https://issues.apache.org/jira/browse/FLINK-30334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ran Tao updated FLINK-30334: ---------------------------- Description: If we use hybrid source, for example, filesystem source A read a.csv, filesystem B read b.csv. It's a very simple case, but it will hang in second source with: 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: hybrid_source[1] received split request from parallel task 0 (#0) 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 0 (on host '') is requesting a file source split 10803 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - Assigning split to non-localized request: Optional[FileSourceSplit: [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info) ID=0000000001 position=null] 10808 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Assigned split to subtask 0 : FileSourceSplit: [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info) ID=0000000001 position=null 10816 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Adding splits subtask=0 sourceIndex=0 currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971 [HybridSourceSplit {sourceIndex=0, splitId=0000000001} ] 10817 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [FileSourceSplit: [file:/Users/xxx/a.csv|file:///Users/chucheng/TMP/a.csv] [0, 49) (no host info) ID=0000000001 position=null] 10822 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0 10864 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [0000000001] +I[hello_a, flink, 1] +I[hello_a, hadoop, 2] +I[hello_a, world, 3] 10866 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000001] 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle. 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0 10868 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited. 10869 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: hybrid_source[1] received split request from parallel task 0 (#0) 10870 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 0 (on host '') is requesting a file source split 10872 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No more splits available for subtask 0 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader received NoMoreSplits event. 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - End of input subtask=0 sourceIndex=0 org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971 StaticFileSplitEnumerator:org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator@69906bb9 10874 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator [] - Starting enumerator for sourceIndex=1 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Switch source event: subtask=0 sourceIndex=1 source=org.apache.flink.connector.file.src.FileSource@12ef574f 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader. 10882 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: hybrid_source[1] received split request from parallel task 0 (#0) *do not read next source data and hang at 'received split request from parallel task'* The reason is that in the 1.16 & master, latest code add context.hasNoMoreSplits check then call enumerator.handleSplitRequest. We do understand the comments for reducing the call splits. But it not consider the the situation about HybridSource. When a subtask hasNoMoreSplits, it will switch to next source. But here just set a check without this situation. When first source read finish, the context just let this subtask with noMoreSplit Status. And the later check can't assign splits with next sources. However , the flink 1.15 is correct. *SourceCoordinator* {code:java} private void handleRequestSplitEvent(int subtask, int attemptNumber, RequestSplitEvent event) { LOG.info( "Source {} received split request from parallel task {} (#{})", operatorName, subtask, attemptNumber); // request splits from the enumerator only if the enumerator has un-assigned splits // this helps to reduce unnecessary split requests to the enumerator if (!context.hasNoMoreSplits(subtask)) { enumerator.handleSplitRequest(subtask, event.hostName()); } } {code} SourceCoordinator call `context.hasNoMoreSplits` check cause the subtask not deal with hybrid the other child sources. SourceCoordinatorContext {code:java} boolean hasNoMoreSplits(int subtaskIndex) { return subtaskHasNoMoreSplits[subtaskIndex]; } @Override public void signalNoMoreSplits(int subtask) { checkSubtaskIndex(subtask); // Ensure the split assignment is done by the coordinator executor. callInCoordinatorThread( () -> { subtaskHasNoMoreSplits[subtask] = true; signalNoMoreSplitsToAttempts(subtask); return null; // void return value }, "Failed to send 'NoMoreSplits' to reader " + subtask); } {code} context set noreMoreSplit true if source is done (without considering the hybrid situation). 1.15 {code:java} public void handleEventFromOperator(int subtask, OperatorEvent event) { runInEventLoop( () -> { if (event instanceof RequestSplitEvent) { LOG.info( "Source {} received split request from parallel task {}", operatorName, subtask); enumerator.handleSplitRequest( subtask, ((RequestSplitEvent) event).hostName()); } {code} was: If we use hybrid source, for example, filesystem source A read a.csv, filesystem B read b.csv. It's a very simple case, but it will hang in second source with: 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: hybrid_source[1] received split request from parallel task 0 (#0) 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 0 (on host '') is requesting a file source split 10803 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - Assigning split to non-localized request: Optional[FileSourceSplit: [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info) ID=0000000001 position=null] 10808 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Assigned split to subtask 0 : FileSourceSplit: [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info) ID=0000000001 position=null 10816 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Adding splits subtask=0 sourceIndex=0 currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971 [HybridSourceSplit\\{sourceIndex=0, splitId=0000000001}] 10817 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [FileSourceSplit: [file:/Users/xxx/a.csv|file:///Users/chucheng/TMP/a.csv] [0, 49) (no host info) ID=0000000001 position=null] 10822 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0 10864 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Finished reading from splits [0000000001] +I[hello_a, flink, 1] +I[hello_a, hadoop, 2] +I[hello_a, world, 3] 10866 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000001] 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle. 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0 10868 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited. 10869 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: hybrid_source[1] received split request from parallel task 0 (#0) 10870 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 0 (on host '') is requesting a file source split 10872 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No more splits available for subtask 0 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader received NoMoreSplits event. 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - End of input subtask=0 sourceIndex=0 org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971 StaticFileSplitEnumerator:org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator@69906bb9 10874 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator [] - Starting enumerator for sourceIndex=1 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Switch source event: subtask=0 sourceIndex=1 source=org.apache.flink.connector.file.src.FileSource@12ef574f 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader. 10882 [SourceCoordinator-Source: hybrid_source[1]] INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: hybrid_source[1] received split request from parallel task 0 (#0) *do not read next source data and hang at 'received split request from parallel task'* The reason is that in the 1.16 & master, latest code add context.hasNoMoreSplits check than call enumerator.handleSplitRequest. We do understand the comments for reducing the call splits. But it not consider the the situation about HybridSource. When a subtask hasNoMoreSplits, it will switch to next source. But here just set a check without this situation. When first source read finish, the context just let this subtask with noMoreSplit Status. And the later check can't assign splits with next sources. However , the flink 1.15 is correct. *SourceCoordinator* {code:java} private void handleRequestSplitEvent(int subtask, int attemptNumber, RequestSplitEvent event) { LOG.info( "Source {} received split request from parallel task {} (#{})", operatorName, subtask, attemptNumber); // request splits from the enumerator only if the enumerator has un-assigned splits // this helps to reduce unnecessary split requests to the enumerator if (!context.hasNoMoreSplits(subtask)) { enumerator.handleSplitRequest(subtask, event.hostName()); } } {code} SourceCoordinator call `context.hasNoMoreSplits` check cause the subtask not deal with hybrid the other child sources. SourceCoordinatorContext {code:java} boolean hasNoMoreSplits(int subtaskIndex) { return subtaskHasNoMoreSplits[subtaskIndex]; } @Override public void signalNoMoreSplits(int subtask) { checkSubtaskIndex(subtask); // Ensure the split assignment is done by the coordinator executor. callInCoordinatorThread( () -> { subtaskHasNoMoreSplits[subtask] = true; signalNoMoreSplitsToAttempts(subtask); return null; // void return value }, "Failed to send 'NoMoreSplits' to reader " + subtask); } {code} context set noreMoreSplit true if source is done (without considering the hybrid situation). 1.15 {code:java} public void handleEventFromOperator(int subtask, OperatorEvent event) { runInEventLoop( () -> { if (event instanceof RequestSplitEvent) { LOG.info( "Source {} received split request from parallel task {}", operatorName, subtask); enumerator.handleSplitRequest( subtask, ((RequestSplitEvent) event).hostName()); } {code} > SourceCoordinator error splitRequest check cause HybridSource hang > ------------------------------------------------------------------ > > Key: FLINK-30334 > URL: https://issues.apache.org/jira/browse/FLINK-30334 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.16.0, 1.17.0 > Reporter: Ran Tao > Priority: Critical > > If we use hybrid source, for example, filesystem source A read a.csv, > filesystem B read b.csv. It's a very simple case, but it will hang in second > source with: > 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source > Source: hybrid_source[1] received split request from parallel task 0 (#0) > 10802 [SourceCoordinator-Source: hybrid_source[1]] INFO > org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - > Subtask 0 (on host '') is requesting a file source split > 10803 [SourceCoordinator-Source: hybrid_source[1]] INFO > org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - > Assigning split to non-localized request: Optional[FileSourceSplit: > [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info) > ID=0000000001 position=null] > 10808 [SourceCoordinator-Source: hybrid_source[1]] INFO > org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - > Assigned split to subtask 0 : FileSourceSplit: > [file:/Users/xxx/a.csv|file:///Users/xxx/a.csv] [0, 49) (no host info) > ID=0000000001 position=null > 10816 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO > org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Adding > splits subtask=0 sourceIndex=0 > currentReader=org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971 > [HybridSourceSplit > {sourceIndex=0, splitId=0000000001} > ] > 10817 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding > split(s) to reader: [FileSourceSplit: > [file:/Users/xxx/a.csv|file:///Users/chucheng/TMP/a.csv] [0, 49) (no host > info) ID=0000000001 position=null] > 10822 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] > (1/1)#0] INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Starting split fetcher 0 > 10864 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] > (1/1)#0] INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished reading from splits [0000000001] > +I[hello_a, flink, 1] > +I[hello_a, hadoop, 2] > +I[hello_a, world, 3] > 10866 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished > reading split(s) [0000000001] > 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] > - Closing splitFetcher 0 because it is idle. > 10868 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Shutting down split fetcher 0 > 10868 [Source Data Fetcher for Source: hybrid_source[1] -> Sink: print_out[2] > (1/1)#0] INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split > fetcher 0 exited. > 10869 [SourceCoordinator-Source: hybrid_source[1]] INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source > Source: hybrid_source[1] received split request from parallel task 0 (#0) > 10870 [SourceCoordinator-Source: hybrid_source[1]] INFO > org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - > Subtask 0 (on host '') is requesting a file source split > 10872 [SourceCoordinator-Source: hybrid_source[1]] INFO > org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No > more splits available for subtask 0 > 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader > received NoMoreSplits event. > 10872 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO > org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - End of > input subtask=0 sourceIndex=0 > org.apache.flink.connector.file.src.impl.FileSourceReader@1e8e1971 > StaticFileSplitEnumerator:org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator@69906bb9 > 10874 [SourceCoordinator-Source: hybrid_source[1]] INFO > org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator [] > - Starting enumerator for sourceIndex=1 > 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO > org.apache.flink.connector.base.source.hybrid.HybridSourceReader [] - Switch > source event: subtask=0 sourceIndex=1 > source=org.apache.flink.connector.file.src.FileSource@12ef574f > 10879 [Source: hybrid_source[1] -> Sink: print_out[2] (1/1)#0] INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing > Source Reader. > 10882 [SourceCoordinator-Source: hybrid_source[1]] INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source > Source: hybrid_source[1] received split request from parallel task 0 (#0) > *do not read next source data and hang at 'received split request from > parallel task'* > > The reason is that in the 1.16 & master, latest code add > context.hasNoMoreSplits check then call enumerator.handleSplitRequest. We do > understand the comments for reducing the call splits. But it not consider the > the situation about HybridSource. When a subtask hasNoMoreSplits, it will > switch to next source. But here just set a check without this situation. When > first source read finish, the context just let this subtask with noMoreSplit > Status. And the later check can't assign splits with next sources. However , > the flink 1.15 is correct. > > *SourceCoordinator* > > {code:java} > private void handleRequestSplitEvent(int subtask, int attemptNumber, > RequestSplitEvent event) { > LOG.info( > "Source {} received split request from parallel task {} (#{})", > operatorName, > subtask, > attemptNumber); > // request splits from the enumerator only if the enumerator has > un-assigned splits > // this helps to reduce unnecessary split requests to the enumerator > if (!context.hasNoMoreSplits(subtask)) { > enumerator.handleSplitRequest(subtask, event.hostName()); > } > } {code} > SourceCoordinator call `context.hasNoMoreSplits` check cause the subtask not > deal with hybrid the other child sources. > > SourceCoordinatorContext > > {code:java} > boolean hasNoMoreSplits(int subtaskIndex) { return > subtaskHasNoMoreSplits[subtaskIndex]; } > @Override public void signalNoMoreSplits(int subtask) { > checkSubtaskIndex(subtask); // Ensure the split assignment is done by the > coordinator executor. callInCoordinatorThread( () -> { > subtaskHasNoMoreSplits[subtask] = true; > signalNoMoreSplitsToAttempts(subtask); return null; // void return value }, > "Failed to send 'NoMoreSplits' to reader " + subtask); } {code} > context set noreMoreSplit true if source is done (without considering the > hybrid situation). > > > 1.15 > > {code:java} > public void handleEventFromOperator(int subtask, OperatorEvent event) { > runInEventLoop( > () -> { > if (event instanceof RequestSplitEvent) { > LOG.info( > "Source {} received split request from parallel > task {}", > operatorName, > subtask); > enumerator.handleSplitRequest( > subtask, ((RequestSplitEvent) event).hostName()); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)