vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1604767686


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception {
         );
     }
 
+    @Test
+    public void testPollTimeoutExpiry() throws Exception {
+        // This is a fabricated test to ensure that a poll timeout expiry 
happens. The tick thread awaits on
+        // task#stop method which is blocked. The timeouts have been set 
accordingly
+        workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(20)));
+        workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(40)));
+        connect = connectBuilder
+            .numBrokers(1)
+            .numWorkers(1)
+            .build();
+
+        connect.start();
+
+        connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not 
brought up in time");
+
+        Map<String, String> connectorWithBlockingTaskStopConfig = new 
HashMap<>();
+        connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSourceConnector.class.getName());
+        connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1");
+        
connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG,
 Objects.requireNonNull(TASK_STOP));
+
+        connect.configureConnector(CONNECTOR_NAME, 
connectorWithBlockingTaskStopConfig);
+
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME, 1, "connector and tasks did not start in time"
+        );
+
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+            connect.restartTask(CONNECTOR_NAME, 0);
+            TestUtils.waitForCondition(() -> 
logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("WARN")) &&

Review Comment:
   Yeah I wanted to add the test in `BlockingConnectorTest` itself but it would 
have meant a lot of changes in that class. That is because currently that test 
doesn't support setting worker level properties or changing the number of 
workers. Being able to change the worker level properties was the way I could 
get the poll timeout expiry. 
   Moreover, the test I have added doesn't really block for the entire stop 
method but ends almost after the task shutdown graceful ms period ends because 
of the reset at the end of the test. Let me know if that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to