[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216840#comment-17216840 ]
Kezhu Wang commented on FLINK-19717: ------------------------------------ {quote}Maybe the fix would be to always check for errors before returning NOTHING_AVAILABLE. {quote} [~sewen] I guess you means checking for errors before returning *{{END_OF_INPUT}}* ? I think it is no enough since what this error-checking expect is that error-setting happens before fetcher-removing. Before error-checking, all error-setting operations should completed, otherwise we are facing undetermined result. We can't get this guarantee from current implementation. Currently, we do fetcher-removing in {{SplitFetcher.shutdownHook}} and error-setting in {{ThrowableCatchingRunnable.exceptionHandler}}. Both operations are concurrent safe, but fetcher-removing happens before error-setting. This means that there is no 'happen before' relationship between error-checking and error-setting. If we reverses 'happen before' relationship between error-setting and fetcher-removing, then, after detecting {{allFetchersHaveShutdown}} and before error-checking, we known that error-setting has completed. So, if we are going to check for errors before returning {{END_OF_INPUT}}, we should also reverse 'happen before' relationship between error-setting and fetcher-removing. [~dianfu] [~rmetzger] [~sewen] [~becket_qin] Should this be a blocker for 1.12 ? > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > ----------------------------------------------------------------------------- > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 1.12.0 > Reporter: Kezhu Wang > Priority: Major > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader<int[], MockSourceSplit>() { > @Override > public RecordsWithSplitIds<int[]> fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)