[ 
https://issues.apache.org/jira/browse/FLINK-36146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18017160#comment-18017160
 ] 

Kim Gräsman commented on FLINK-36146:
-------------------------------------

The PR closed due to inactivity, I would appreciate if it could be reopened. 
This is still an issue that hits us occasionally, and it would be nice to get 
it out of the way for future releases.

> NoSuchElement exception from SingleThreadFetcherManager
> -------------------------------------------------------
>
>                 Key: FLINK-36146
>                 URL: https://issues.apache.org/jira/browse/FLINK-36146
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>         Environment: AWS EMR/Yarn
>            Reporter: Kim Gräsman
>            Priority: Minor
>              Labels: pull-request-available
>
> We're running Flink 1.14.2, but this appears to be an issue still on 
> mainline, so I thought I'd report it.
> When running with high parallelism we've noticed a spurious error triggered 
> by a FileSource reader from S3;
> {code:java}
> 2024-08-19 15:23:07,044 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
> reading split(s) [0000543131]
> 2024-08-19 15:23:07,044 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished reading from splits [0000543131]
> 2024-08-19 15:23:07,044 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] 
> - Closing splitFetcher 157 because it is idle.
> 2024-08-19 15:23:07,045 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Shutting down split fetcher 157
> 2024-08-19 15:23:07,045 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
> fetcher 157 exited.
> 2024-08-19 15:23:07,048 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: [FileSourceSplit: ... [0, 21679984)  hosts=[localhost] 
> ID=0000201373 position=null]
> 2024-08-19 15:23:07,064 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing 
> Source Reader.
> 2024-08-19 15:23:07,069 WARN  org.apache.flink.runtime.taskmanager.Task       
>              [] - Source: ... -> ... (114/1602)#0 (...) switched from RUNNING 
> to FAILED with failure cause: java.util.NoSuchElementException
>         at 
> java.base/java.util.concurrent.ConcurrentHashMap$ValueIterator.next(ConcurrentHashMap.java:3471)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.getRunningFetcher(SingleThreadFetcherManager.java:94)
>         at 
> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:82)
>         at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242)
>         at 
> org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:428)
>         at 
> org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:70)
>         at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:83)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$19(StreamTask.java:1473)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>         at java.base/java.lang.Thread.run(Thread.java:829) {code}
> I believe this may be caused by a tiny TOCTOU race in 
> {{{}SingleThreadedFetcherManager{}}}. I'll admit that I don't fully 
> understand what the execution flows through that code look like, but the use 
> of atomic and synchronized indicate that it's used by multiple threads. If 
> that's not the case, this report can be safely ignored.
> The backtrace points to 
> [https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165|https://github.com/apache/flink/blob/4faf0966766e3734792f80ed66e512aa3033cacd/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java#L165.]
> And it looks like the concurrent hash map might be modified between the check 
> for {{isEmpty()}} and the call to {{{}fetchers.values().iterator().next(){}}}.
> I would suggest Python-style:
> {code:java}
> try {
>   return fetchers.values().iterator().next();
> } catch (NoSuchElementException) {
>   return null;
> }{code}
> here instead, which should let {{ConcurrentHashMap}} handle its 
> synchronization internally.
> For some reason we were able to reproduce consistently with 2 Task Managers 
> and 2 slots per node, but not with 1 Task Manager and 4 slots, if that helps 
> construct a repro test case (presumably more interlocking from 
> {{synchronized}} in a single-TM environment, but not sure).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to