[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17221252#comment-17221252
 ] 

Dian Fu commented on FLINK-19717:
---------------------------------

[~kezhuw] Any update on this ticket? It would be great if we can fix this issue 
by this week as we're planning to building the first RC of 1.12 on next Monday.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-19717
>                 URL: https://issues.apache.org/jira/browse/FLINK-19717
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.12.0
>            Reporter: Kezhu Wang
>            Assignee: Kezhu Wang
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>       @Test
>       public void testExceptionInSplitReader() throws Exception {
>               expectedException.expect(RuntimeException.class);
>               expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>               final String errMsg = "Testing Exception";
>               FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
> elementsQueue =
>                       new FutureCompletingBlockingQueue<>();
>               // We have to handle split changes first, otherwise fetch will 
> not be called.
>               try (MockSourceReader reader = new MockSourceReader(
>                       elementsQueue,
>                       () -> new SplitReader<int[], MockSourceSplit>() {
>                               @Override
>                               public RecordsWithSplitIds<int[]> fetch() {
>                                       throw new RuntimeException(errMsg);
>                               }
>                               @Override
>                               public void 
> handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
>                               @Override
>                               public void wakeUp() {
>                               }
>                       },
>                       getConfig(),
>                       null)) {
>                       ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>                       reader.addSplits(Collections.singletonList(getSplit(0,
>                               NUM_RECORDS_PER_SPLIT,
>                               Boundedness.CONTINUOUS_UNBOUNDED)));
>                       reader.handleSourceEvents(new NoMoreSplitsEvent());
>                       // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>                       while (true) {
>                               InputStatus inputStatus = 
> reader.pollNext(output);
>                               assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>                               // Add a sleep to avoid tight loop.
>                               Thread.sleep(0);
>                       }
>               }
>       }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to