[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230689#comment-17230689 ] Stephan Ewen commented on FLINK-19717: -- Fixed in 1.12.0 via 2cce1aced0d6a311ff0803b773f1565e7f9d76fc > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Assignee: Kezhu Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228550#comment-17228550 ] Stephan Ewen commented on FLINK-19717: -- Yes, we need to pick back all changes from master to release-1.11 in order. Otherwise it will be impossible. Let's coordinate between the two of us how we do this... > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Assignee: Kezhu Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228328#comment-17228328 ] Jiangjie Qin commented on FLINK-19717: -- [~sewen] I have merged the patch to master. But cherry-picking the patch to release-1.11 has some conflicts. Do you want to sequentialize the backporting of this patch with other Source patches? Otherwise I can also rebase the patch on release-1.11. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Assignee: Kezhu Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228016#comment-17228016 ] Stephan Ewen commented on FLINK-19717: -- We are about to merge this. There is consensus on problem and solution, so it'll be in very soon. Given that this is not a feature, but a critical bug fix, I think the feature freeze deadline does not apply here. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Assignee: Kezhu Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17226786#comment-17226786 ] Robert Metzger commented on FLINK-19717: Is this blocker expected to be merged before the feature freeze on Sunday? I guess we could also fix this bug after the feature freeze in the stabilization phase. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Assignee: Kezhu Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221318#comment-17221318 ] Dian Fu commented on FLINK-19717: - [~rmetzger] You are right. I missed the PR. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Assignee: Kezhu Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221272#comment-17221272 ] Robert Metzger commented on FLINK-19717: [~dian.fu] I believe the issue here is that [~kezhuw] is waiting for a reviewer of the PR. It has been open for 4 days. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Assignee: Kezhu Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221252#comment-17221252 ] Dian Fu commented on FLINK-19717: - [~kezhuw] Any update on this ticket? It would be great if we can fix this issue by this week as we're planning to building the first RC of 1.12 on next Monday. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Assignee: Kezhu Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220783#comment-17220783 ] Robert Metzger commented on FLINK-19717: Thank you. I assigned you to the ticket. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Assignee: Kezhu Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219695#comment-17219695 ] Kezhu Wang commented on FLINK-19717: [~sewen] I am willing to take over this. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Priority: Blocker > Fix For: 1.12.0, 1.11.3 > > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219692#comment-17219692 ] Stephan Ewen commented on FLINK-19717: -- [~kezhuw] I agree with your analysis and with the suggested fix. Good work! Do you want to prepare a PR for this? Otherwise I could work on this in a few days. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Priority: Major > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218676#comment-17218676 ] Kezhu Wang commented on FLINK-19717: [~rmetzger] Thanks for response. Let's wait their decision. [~sewen] [~becket_qin] I think we can solve this by moving {{exceptionHandler}} from {{ThrowableCatchingRunnable}} to {{SplitFetcher}}, this way we can rearrange 'happen before' relationship between error-setting and fetcher-removing inside {{SplitFetcher}} with no interference from extra structure. I have created [a branch in my clone|https://github.com/kezhuw/flink/tree/test-case-source-reader-end-of-input-caused-by-split-reader-exception] to demonstrate test case and possible fix. Commit [3c65b2d|https://github.com/kezhuw/flink/commit/3c65b2d83c18c787fd590ece17304110e3fc] modifies and fails {{SourceReaderBaseTest.testExceptionInSplitReader}}, [e2e383|https://github.com/kezhuw/flink/commit/e2e3832c7a4c726405111fb9198e6737417877d5] fixes it. Does this and above analysis make sense to you ? If it is, could you mind assign this issue to me ? > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Priority: Major > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1.
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218181#comment-17218181 ] Robert Metzger commented on FLINK-19717: Thanks a lot for your investigation and for opening this ticket. I can not comment on the priority of the ticket. Let's wait for Stephan or Jiangjie. > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Priority: Major > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); > assertNotEquals(InputStatus.END_OF_INPUT, > inputStatus); > // Add a sleep to avoid tight loop. > Thread.sleep(0); > } > } > } > {code} > This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from > existing one in three places: > 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets > {{SourceReaderBase.noMoreSplitsAssignment}} to true. > 2. Add assertion to assert that {{reader.pollNext}} will not return > {{InputStatus.END_OF_INPUT}}. > 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure > rate from 1/200 to 1/2. > See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for > initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
[ https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216840#comment-17216840 ] Kezhu Wang commented on FLINK-19717: {quote}Maybe the fix would be to always check for errors before returning NOTHING_AVAILABLE. {quote} [~sewen] I guess you means checking for errors before returning *{{END_OF_INPUT}}* ? I think it is no enough since what this error-checking expect is that error-setting happens before fetcher-removing. Before error-checking, all error-setting operations should completed, otherwise we are facing undetermined result. We can't get this guarantee from current implementation. Currently, we do fetcher-removing in {{SplitFetcher.shutdownHook}} and error-setting in {{ThrowableCatchingRunnable.exceptionHandler}}. Both operations are concurrent safe, but fetcher-removing happens before error-setting. This means that there is no 'happen before' relationship between error-checking and error-setting. If we reverses 'happen before' relationship between error-setting and fetcher-removing, then, after detecting {{allFetchersHaveShutdown}} and before error-checking, we known that error-setting has completed. So, if we are going to check for errors before returning {{END_OF_INPUT}}, we should also reverse 'happen before' relationship between error-setting and fetcher-removing. [~dianfu] [~rmetzger] [~sewen] [~becket_qin] Should this be a blocker for 1.12 ? > SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws > - > > Key: FLINK-19717 > URL: https://issues.apache.org/jira/browse/FLINK-19717 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Kezhu Wang >Priority: Major > > Here are my imaginative execution flows: > 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After > executes {{splitFetcherManager.checkErrors()}} but before > {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. > 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. > {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from > {{SplitFetcherManager}}. > 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no > elements in queue, {{elementsQueue}} will be reset to unavailable. > 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will > enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional > fetcher is last alive fetcher, then > {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to > {{InputStatus.END_OF_INPUT}} > 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. > Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which > will fails in rate about 1/2. > {code:java} > @Test > public void testExceptionInSplitReader() throws Exception { > expectedException.expect(RuntimeException.class); > expectedException.expectMessage("One or more fetchers have > encountered exception"); > final String errMsg = "Testing Exception"; > FutureCompletingBlockingQueue> > elementsQueue = > new FutureCompletingBlockingQueue<>(); > // We have to handle split changes first, otherwise fetch will > not be called. > try (MockSourceReader reader = new MockSourceReader( > elementsQueue, > () -> new SplitReader() { > @Override > public RecordsWithSplitIds fetch() { > throw new RuntimeException(errMsg); > } > @Override > public void > handleSplitsChanges(SplitsChange splitsChanges) {} > @Override > public void wakeUp() { > } > }, > getConfig(), > null)) { > ValidatingSourceOutput output = new > ValidatingSourceOutput(); > reader.addSplits(Collections.singletonList(getSplit(0, > NUM_RECORDS_PER_SPLIT, > Boundedness.CONTINUOUS_UNBOUNDED))); > reader.handleSourceEvents(new NoMoreSplitsEvent()); > // This is not a real infinite loop, it is supposed to > throw exception after some polls. > while (true) { > InputStatus inputStatus = > reader.pollNext(output); >