vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1604767686
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception { ); } + @Test + public void testPollTimeoutExpiry() throws Exception { + // This is a fabricated test to ensure that a poll timeout expiry happens. The tick thread awaits on + // task#stop method which is blocked. The timeouts have been set accordingly + workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(20))); + workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(40))); + connect = connectBuilder + .numBrokers(1) + .numWorkers(1) + .build(); + + connect.start(); + + connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not brought up in time"); + + Map<String, String> connectorWithBlockingTaskStopConfig = new HashMap<>(); + connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSourceConnector.class.getName()); + connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1"); + connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG, Objects.requireNonNull(TASK_STOP)); + + connect.configureConnector(CONNECTOR_NAME, connectorWithBlockingTaskStopConfig); + + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, 1, "connector and tasks did not start in time" + ); + + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { + connect.restartTask(CONNECTOR_NAME, 0); + TestUtils.waitForCondition(() -> logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN")) && Review Comment: Yeah I wanted to add the test in `BlockingConnectorTest` itself but it would have meant a lot of changes in that class. That is because currently that test doesn't support setting worker level properties or changing the number of workers. Being able to change the worker level properties was the way I could get the poll timeout expiry. Moreover, the test I have added doesn't really block for the entire stop method but ends almost after the task shutdown graceful ms period ends because of the reset at the end of the test. Let me know if that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org