[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220783#comment-17220783 ]
Robert Metzger commented on FLINK-19717: ---------------------------------------- Thank you. I assigned you to the ticket. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > ----------------------------------------------------------------------------- > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 1.12.0 > Reporter: Kezhu Wang > Assignee: Kezhu Wang > Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader<int[], MockSourceSplit>() { > @Override > public RecordsWithSplitIds<int[]> fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)