zhuzhurk commented on code in PR #21464:
URL: https://github.com/apache/flink/pull/21464#discussion_r1057158451


##########
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumeratorTest.java:
##########
@@ -216,6 +216,43 @@ public void testDefaultMethodDelegation() throws Exception 
{
         Mockito.verify(underlyingEnumeratorSpy).handleSourceEvent(0, se);
     }
 
+    @Test
+    public void testInterceptNoMoreSplitEvent() {
+        context = new MockSplitEnumeratorContext<>(2);
+        source = 
HybridSource.builder(MOCK_SOURCE).addSource(MOCK_SOURCE).build();
+
+        enumerator = (HybridSourceSplitEnumerator) 
source.createEnumerator(context);
+        enumerator.start();
+        // mock enumerator assigns splits once all readers are registered
+        // At this time, hasNoMoreSplit check will call 
context.signalIntermediateNoMoreSplits
+        registerReader(context, enumerator, SUBTASK0);
+        registerReader(context, enumerator, SUBTASK1);
+        enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(-1));
+        enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(-1));
+        assertThat(context.hasNoMoreSplits(0)).isFalse();
+        assertThat(context.hasNoMoreSplits(1)).isFalse();
+        splitFromSource0 =
+                
context.getSplitsAssignmentSequence().get(0).assignment().get(SUBTASK0).get(0);
+
+        // task read finished, hasNoMoreSplit check will call 
context.signalNoMoreSplits, this is
+        // final finished event
+        enumerator.handleSourceEvent(SUBTASK0, new 
SourceReaderFinishedEvent(0));
+        enumerator.handleSourceEvent(SUBTASK1, new 
SourceReaderFinishedEvent(0));
+        assertThat(context.hasNoMoreSplits(0)).isTrue();
+        assertThat(context.hasNoMoreSplits(1)).isTrue();
+
+        // test add splits back, then SUBTASK0 restore splitFromSource0 split
+        // reset splits assignment & previous subtaskHasNoMoreSplits flag.
+        context.getSplitsAssignmentSequence().clear();
+        Whitebox.setInternalState(context, "subtaskHasNoMoreSplits", new 
boolean[] {false, false});

Review Comment:
   It's better to add a method `resetNoMoreSplits(int)` in 
`MockSplitEnumeratorContext`.
   Instead of using reflection to do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to