[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-11-12 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230689#comment-17230689
 ] 

Stephan Ewen commented on FLINK-19717:
--

Fixed in 1.12.0 via 2cce1aced0d6a311ff0803b773f1565e7f9d76fc

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Assignee: Kezhu Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-11-09 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228550#comment-17228550
 ] 

Stephan Ewen commented on FLINK-19717:
--

Yes, we need to pick back all changes from master to release-1.11 in order. 
Otherwise it will be impossible.
Let's coordinate between the two of us how we do this...

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Assignee: Kezhu Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-11-08 Thread Jiangjie Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228328#comment-17228328
 ] 

Jiangjie Qin commented on FLINK-19717:
--

[~sewen] I have merged the patch to master. But cherry-picking the patch to 
release-1.11 has some conflicts. Do you want to sequentialize the backporting 
of this patch with other Source patches? Otherwise I can also rebase the patch 
on release-1.11.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Assignee: Kezhu Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-11-08 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228016#comment-17228016
 ] 

Stephan Ewen commented on FLINK-19717:
--

We are about to merge this. There is consensus on problem and solution, so 
it'll be in very soon.

Given that this is not a feature, but a critical bug fix, I think the feature 
freeze deadline does not apply here.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Assignee: Kezhu Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-11-05 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17226786#comment-17226786
 ] 

Robert Metzger commented on FLINK-19717:


Is this blocker expected to be merged before the feature freeze on Sunday?
I guess we could also fix this bug after the feature freeze in the 
stabilization phase.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Assignee: Kezhu Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-10-27 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221318#comment-17221318
 ] 

Dian Fu commented on FLINK-19717:
-

[~rmetzger] You are right. I missed the PR.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Assignee: Kezhu Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-10-27 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221272#comment-17221272
 ] 

Robert Metzger commented on FLINK-19717:


[~dian.fu] I believe the issue here is that [~kezhuw] is waiting for a reviewer 
of the PR. It has been open for 4 days.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Assignee: Kezhu Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-10-27 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221252#comment-17221252
 ] 

Dian Fu commented on FLINK-19717:
-

[~kezhuw] Any update on this ticket? It would be great if we can fix this issue 
by this week as we're planning to building the first RC of 1.12 on next Monday.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Assignee: Kezhu Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-10-26 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220783#comment-17220783
 ] 

Robert Metzger commented on FLINK-19717:


Thank you. I assigned you to the ticket.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Assignee: Kezhu Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-10-23 Thread Kezhu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219695#comment-17219695
 ] 

Kezhu Wang commented on FLINK-19717:


[~sewen] I am willing to take over this.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Priority: Blocker
> Fix For: 1.12.0, 1.11.3
>
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-10-23 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219692#comment-17219692
 ] 

Stephan Ewen commented on FLINK-19717:
--

[~kezhuw] I agree with your analysis and with the suggested fix. Good work!

Do you want to prepare a PR for this? Otherwise I could work on this in a few 
days.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Priority: Major
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-10-21 Thread Kezhu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218676#comment-17218676
 ] 

Kezhu Wang commented on FLINK-19717:


[~rmetzger] Thanks for response. Let's wait their decision.

[~sewen] [~becket_qin] I think we can solve this by moving {{exceptionHandler}} 
from {{ThrowableCatchingRunnable}} to {{SplitFetcher}}, this way we can 
rearrange 'happen before' relationship between error-setting and 
fetcher-removing inside {{SplitFetcher}} with no interference from extra 
structure. I have created [a branch in my 
clone|https://github.com/kezhuw/flink/tree/test-case-source-reader-end-of-input-caused-by-split-reader-exception]
 to demonstrate test case and possible fix. Commit 
[3c65b2d|https://github.com/kezhuw/flink/commit/3c65b2d83c18c787fd590ece17304110e3fc]
 modifies and fails {{SourceReaderBaseTest.testExceptionInSplitReader}}, 
[e2e383|https://github.com/kezhuw/flink/commit/e2e3832c7a4c726405111fb9198e6737417877d5]
 fixes it. Does this and above analysis make sense to you ? If it is, could you 
mind assign this issue to me ?

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Priority: Major
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. 

[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-10-21 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218181#comment-17218181
 ] 

Robert Metzger commented on FLINK-19717:


Thanks a lot for your investigation and for opening this ticket.
I can not comment on the priority of the ticket. Let's wait for Stephan or 
Jiangjie.

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Priority: Major
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>   assertNotEquals(InputStatus.END_OF_INPUT, 
> inputStatus);
>   // Add a sleep to avoid tight loop.
>   Thread.sleep(0);
>   }
>   }
>   }
> {code}
> This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from 
> existing one in three places:
>  1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets 
> {{SourceReaderBase.noMoreSplitsAssignment}} to true.
>  2. Add assertion to assert that {{reader.pollNext}} will not return 
> {{InputStatus.END_OF_INPUT}}.
>  3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure 
> rate from 1/200 to 1/2.
> See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for 
> initial discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19717) SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws

2020-10-19 Thread Kezhu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17216840#comment-17216840
 ] 

Kezhu Wang commented on FLINK-19717:


{quote}Maybe the fix would be to always check for errors before returning 
NOTHING_AVAILABLE.
{quote}
[~sewen] I guess you means checking for errors before returning 
*{{END_OF_INPUT}}* ? I think it is no enough since what this error-checking 
expect is that error-setting happens before fetcher-removing. Before 
error-checking, all error-setting operations should completed, otherwise we are 
facing undetermined result. We can't get this guarantee from current 
implementation. Currently, we do fetcher-removing in 
{{SplitFetcher.shutdownHook}} and error-setting in 
{{ThrowableCatchingRunnable.exceptionHandler}}. Both operations are concurrent 
safe, but fetcher-removing happens before error-setting. This means that there 
is no 'happen before' relationship between error-checking and error-setting. If 
we reverses 'happen before' relationship between error-setting and 
fetcher-removing, then, after detecting {{allFetchersHaveShutdown}} and before 
error-checking, we known that error-setting has completed. So, if we are going 
to check for errors before returning {{END_OF_INPUT}}, we should also reverse 
'happen before' relationship between error-setting and fetcher-removing.

[~dianfu] [~rmetzger] [~sewen] [~becket_qin] Should this be a blocker for 1.12 ?

> SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws
> -
>
> Key: FLINK-19717
> URL: https://issues.apache.org/jira/browse/FLINK-19717
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Kezhu Wang
>Priority: Major
>
> Here are my imaginative execution flows:
> 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After 
> executes {{splitFetcherManager.checkErrors()}} but before 
> {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run.
> 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. 
> {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from 
> {{SplitFetcherManager}}.
> 3. In mailbox thread,  {{elementsQueue.poll()}} executes. If there is no 
> elements in queue, {{elementsQueue}} will be reset to unavailable.
> 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will 
> enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional 
> fetcher is last alive fetcher, then 
> {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to 
> {{InputStatus.END_OF_INPUT}}
> 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}.
> Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which 
> will fails in rate about 1/2.
> {code:java}
>   @Test
>   public void testExceptionInSplitReader() throws Exception {
>   expectedException.expect(RuntimeException.class);
>   expectedException.expectMessage("One or more fetchers have 
> encountered exception");
>   final String errMsg = "Testing Exception";
>   FutureCompletingBlockingQueue> 
> elementsQueue =
>   new FutureCompletingBlockingQueue<>();
>   // We have to handle split changes first, otherwise fetch will 
> not be called.
>   try (MockSourceReader reader = new MockSourceReader(
>   elementsQueue,
>   () -> new SplitReader() {
>   @Override
>   public RecordsWithSplitIds fetch() {
>   throw new RuntimeException(errMsg);
>   }
>   @Override
>   public void 
> handleSplitsChanges(SplitsChange splitsChanges) {}
>   @Override
>   public void wakeUp() {
>   }
>   },
>   getConfig(),
>   null)) {
>   ValidatingSourceOutput output = new 
> ValidatingSourceOutput();
>   reader.addSplits(Collections.singletonList(getSplit(0,
>   NUM_RECORDS_PER_SPLIT,
>   Boundedness.CONTINUOUS_UNBOUNDED)));
>   reader.handleSourceEvents(new NoMoreSplitsEvent());
>   // This is not a real infinite loop, it is supposed to 
> throw exception after some polls.
>   while (true) {
>   InputStatus inputStatus = 
> reader.pollNext(output);
>