yashmayya commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1146151628


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -215,6 +215,17 @@ public class DistributedHerderTest {
             Collections.emptyMap(),
             Collections.emptySet(),
             Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT_STOPPED_CONN1 = new 
ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG),
+            Collections.singletonMap(CONN1, TargetState.STOPPED),
+            TASK_CONFIGS_MAP,

Review Comment:
   nit: a stopped connector would be expected to have an empty set of task 
configs



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2159,6 +2170,66 @@ public void testConnectorResumed() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testConnectorStopped() throws Exception {
+        // ensure that target state changes are propagated to the worker
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList());
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        Capture<Callback<TargetState>> onStart = newCapture();
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), 
EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), 
capture(onStart));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            onStart.getValue().onCompletion(null, TargetState.STARTED);
+            return true;
+        });
+        member.wakeup();
+        PowerMock.expectLastCall();
+        expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> 
TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        
EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_STOPPED_CONN1);
+        PowerMock.expectLastCall();
+
+        Capture<Callback<TargetState>> onStop = newCapture();
+        worker.setTargetState(EasyMock.eq(CONN1), 
EasyMock.eq(TargetState.STOPPED), capture(onStop));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            onStart.getValue().onCompletion(null, TargetState.STOPPED);
+            return null;
+        });
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // These will occur just before/during the third tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange(CONN1); // state 
changes to paused

Review Comment:
   ```suggestion
           configUpdateListener.onConnectorTargetStateChange(CONN1); // state 
changes to stopped
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2159,6 +2170,66 @@ public void testConnectorResumed() throws Exception {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testConnectorStopped() throws Exception {
+        // ensure that target state changes are propagated to the worker
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList());
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        Capture<Callback<TargetState>> onStart = newCapture();
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), 
EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), 
capture(onStart));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            onStart.getValue().onCompletion(null, TargetState.STARTED);
+            return true;
+        });
+        member.wakeup();
+        PowerMock.expectLastCall();
+        expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> 
TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        
EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_STOPPED_CONN1);
+        PowerMock.expectLastCall();

Review Comment:
   ```suggestion
   ```
   nit: we're already setting up the expectation in the previous statement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to