[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220783#comment-17220783
 ] 

Robert Metzger commented on FLINK-19717:
----------------------------------------

Thank you. I assigned you to the ticket.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-19717
>                 URL: https://issues.apache.org/jira/browse/FLINK-19717
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.12.0
>            Reporter: Kezhu Wang
>            Assignee: Kezhu Wang
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>       @Test
>       public void testExceptionInSplitReader() throws Exception {
>               expectedException.expect(RuntimeException.class);
>               expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>               final String errMsg = "Testing Exception";
>               FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
> elementsQueue =
>                       new FutureCompletingBlockingQueue<>();
>               // We have to handle split changes first, otherwise fetch will 
> not be called.
>               try (MockSourceReader reader = new MockSourceReader(
>                       elementsQueue,
>                       () -> new SplitReader<int[], MockSourceSplit>() {
>                               @Override
>                               public RecordsWithSplitIds<int[]> fetch() {
>                                       throw new RuntimeException(errMsg);
>                               }
>                               @Override
>                               public void 
> handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
>                               @Override
>                               public void wakeUp() {
>                               }
>                       },
>                       getConfig(),
>                       null)) {
>                       ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>                       reader.addSplits(Collections.singletonList(getSplit(0,
>                               NUM_RECORDS_PER_SPLIT,
>                               Boundedness.CONTINUOUS_UNBOUNDED)));
>                       reader.handleSourceEvents(new NoMoreSplitsEvent());
>                       // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>                       while (true) {
>                               InputStatus inputStatus = 
> reader.pollNext(output);
>                               assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>                               // Add a sleep to avoid tight loop.
>                               Thread.sleep(0);
>                       }
>               }
>       }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to