C0urante commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1119036590


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();

Review Comment:
   Wakeups basically don't matter in these tests; if it's easier, feel free to 
append `.anyTimes()` here.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task 
configs")).anyTimes();

Review Comment:
   This test is great. I think it'd be worth it to perform a third and fourth 
tick. The third can be used to simulate successfully generating task configs 
after the two failed attempts, and the fourth can be used to ensure that we 
don't retry any further.
   
   It's also worth noting that we're only testing the case where 
`Connector::taskConfigs` (or really, `Worker::connectorTaskConfigs`) fails, but 
the logic that's being added here applies if intra-cluster communication fails 
as well (which may happen if the leader of the cluster is temporarily 
unavailable, for example). It'd be nice if we could have test coverage for that 
too, but I won't block this PR on that.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task 
configs")).anyTimes();
+
+        // task reconfiguration request with initial retry backoff
+        member.poll(EasyMock.eq(250L));
+        PowerMock.expectLastCall();
+
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        // task reconfiguration request with double the initial retry backoff
+        member.poll(EasyMock.eq(500L));
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        // initial tick
+        herder.tick();
+        herder.requestTaskReconfiguration(CONN1);
+        // process the task reconfiguration request in this tick
+        herder.tick();
+        // advance the time by 250ms so that the task reconfiguration request 
with initial retry backoff is processed
+        time.sleep(250);
+        herder.tick();

Review Comment:
   Can't forget this part:
   
   ```suggestion
           herder.tick();
   
           PowerMock.verifyAll();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to