Kezhu Wang created FLINK-19717:
----------------------------------

             Summary: SourceReaderBase.pollNext may return END_OF_INPUT if 
SplitReader.fetch throws
                 Key: FLINK-19717
                 URL: https://issues.apache.org/jira/browse/FLINK-19717
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Common
    Affects Versions: 1.12.0
            Reporter: Kezhu Wang


Here are my imaginative execution flows:
1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
executes {{splitFetcherManager.checkErrors()}} but before 
{{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
{{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
{{SplitFetcherManager}}.
3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
elements in queue, {{elementsQueue}} will be reset to unavailable.
4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional fetcher 
is last alive fetcher, then {{SourceReaderBase.finishedOrAvailableLater}} may 
evaluate to {{InputStatus.END_OF_INPUT}}
5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.

Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which will 
fails in rate about 1/2.
{code:java}
        @Test
        public void testExceptionInSplitReader() throws Exception {
                expectedException.expect(RuntimeException.class);
                expectedException.expectMessage("One or more fetchers have 
encountered exception");
                final String errMsg = "Testing Exception";

                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
                        new FutureCompletingBlockingQueue<>();
                // We have to handle split changes first, otherwise fetch will 
not be called.
                try (MockSourceReader reader = new MockSourceReader(
                        elementsQueue,
                        () -> new SplitReader<int[], MockSourceSplit>() {
                                @Override
                                public RecordsWithSplitIds<int[]> fetch() {
                                        throw new RuntimeException(errMsg);
                                }

                                @Override
                                public void 
handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}

                                @Override
                                public void wakeUp() {
                                }
                        },
                        getConfig(),
                        null)) {
                        ValidatingSourceOutput output = new 
ValidatingSourceOutput();
                        reader.addSplits(Collections.singletonList(getSplit(0,
                                NUM_RECORDS_PER_SPLIT,
                                Boundedness.CONTINUOUS_UNBOUNDED)));
                        reader.handleSourceEvents(new NoMoreSplitsEvent());
                        // This is not a real infinite loop, it is supposed to 
throw exception after some polls.
                        while (true) {
                                InputStatus inputStatus = 
reader.pollNext(output);
                                assertNotEquals(InputStatus.END_OF_INPUT, 
inputStatus);
                                // Add a sleep to avoid tight loop.
                                Thread.sleep(0);
                        }
                }
        }
{code}

This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
existing one in three places:
 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
{{SourceReaderBase.noMoreSplitsAssignment}} to true.
 2. Add assertion to assert that {{reader.pollNext}} will not return 
{{InputStatus.END_OF_INPUT}}.
 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure rate 
from 1/200 to 1/2.

See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for initial 
discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to