yashmayya commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1119636919


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task 
configs")).anyTimes();
+
+        // task reconfiguration request with initial retry backoff
+        member.poll(EasyMock.eq(250L));
+        PowerMock.expectLastCall();
+
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        // task reconfiguration request with double the initial retry backoff
+        member.poll(EasyMock.eq(500L));
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        // initial tick
+        herder.tick();
+        herder.requestTaskReconfiguration(CONN1);
+        // process the task reconfiguration request in this tick
+        herder.tick();
+        // advance the time by 250ms so that the task reconfiguration request 
with initial retry backoff is processed
+        time.sleep(250);
+        herder.tick();

Review Comment:
   Whoops, thanks 🤦



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task 
configs")).anyTimes();

Review Comment:
   Thanks, I've updated the test to add another herder tick which runs a 
successful task reconfiguration request (I skipped the addition of another tick 
because the no further retries bit can be verified by the poll timeout at the 
end of the previous tick).
   
   Regarding the test case for the task reconfiguration REST request to the 
leader - I did consider that initially but while trying to add one, there were 
some complications (timing related issues) arising from the use of the 
`forwardRequestExecutor` at which point I felt like it was more trouble that it 
was worth. However, your comment made me revisit it and I've made some changes 
to drop in a simple mock executor service which runs requests synchronously (on 
the same thread as the caller). Let me know what you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to