[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216840#comment-17216840
 ] 

Kezhu Wang commented on FLINK-19717:
------------------------------------

{quote}Maybe the fix would be to always check for errors before returning 
NOTHING_AVAILABLE.
{quote}
[~sewen] I guess you means checking for errors before returning 
*{{END_OF_INPUT}}* ? I think it is no enough since what this error-checking 
expect is that error-setting happens before fetcher-removing. Before 
error-checking, all error-setting operations should completed, otherwise we are 
facing undetermined result. We can't get this guarantee from current 
implementation. Currently, we do fetcher-removing in 
{{SplitFetcher.shutdownHook}} and error-setting in 
{{ThrowableCatchingRunnable.exceptionHandler}}. Both operations are concurrent 
safe, but fetcher-removing happens before error-setting. This means that there 
is no 'happen before' relationship between error-checking and error-setting. If 
we reverses 'happen before' relationship between error-setting and 
fetcher-removing, then, after detecting {{allFetchersHaveShutdown}} and before 
error-checking, we known that error-setting has completed. So, if we are going 
to check for errors before returning {{END_OF_INPUT}}, we should also reverse 
'happen before' relationship between error-setting and fetcher-removing.

[~dianfu] [~rmetzger] [~sewen] [~becket_qin] Should this be a blocker for 1.12 ?

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-19717
>                 URL: https://issues.apache.org/jira/browse/FLINK-19717
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.12.0
>            Reporter: Kezhu Wang
>            Priority: Major
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>       @Test
>       public void testExceptionInSplitReader() throws Exception {
>               expectedException.expect(RuntimeException.class);
>               expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>               final String errMsg = "Testing Exception";
>               FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
> elementsQueue =
>                       new FutureCompletingBlockingQueue<>();
>               // We have to handle split changes first, otherwise fetch will 
> not be called.
>               try (MockSourceReader reader = new MockSourceReader(
>                       elementsQueue,
>                       () -> new SplitReader<int[], MockSourceSplit>() {
>                               @Override
>                               public RecordsWithSplitIds<int[]> fetch() {
>                                       throw new RuntimeException(errMsg);
>                               }
>                               @Override
>                               public void 
> handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
>                               @Override
>                               public void wakeUp() {
>                               }
>                       },
>                       getConfig(),
>                       null)) {
>                       ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>                       reader.addSplits(Collections.singletonList(getSplit(0,
>                               NUM_RECORDS_PER_SPLIT,
>                               Boundedness.CONTINUOUS_UNBOUNDED)));
>                       reader.handleSourceEvents(new NoMoreSplitsEvent());
>                       // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>                       while (true) {
>                               InputStatus inputStatus = 
> reader.pollNext(output);
>                               assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>                               // Add a sleep to avoid tight loop.
>                               Thread.sleep(0);
>                       }
>               }
>       }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to