gharris1727 commented on code in PR #15080: URL: https://github.com/apache/kafka/pull/15080#discussion_r1442146732
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -1001,6 +1001,48 @@ public void testResetConnectorOffsets() throws Exception { assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); } + @Test() + public void testRequestTaskReconfigurationDoesNotDeadlock() throws Exception { + connector = mock(BogusSourceConnector.class); + expectAdd(SourceSink.SOURCE); + + // Start the connector + Map<String, String> config = connectorConfig(SourceSink.SOURCE); + Connector connectorMock = mock(SourceConnector.class); + expectConfigValidation(connectorMock, true, config); + + herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); + + // Wait on connector to start + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS); + assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); + + // Updated task with new config + Map<String, String> updatedTaskConfig = taskConfig(SourceSink.SOURCE); + updatedTaskConfig.put("dummy-task-property", "yes"); + when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))) + .thenReturn(Collections.singletonList(updatedTaskConfig)); + + // Expect that tasks will be stopped and started again + when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); + doNothing().when(worker).stopAndAwaitTasks(singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0))); + when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(updatedTaskConfig), eq(herder), eq(TargetState.STARTED))).thenReturn(true); + + // Set new connector config + Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE); + newConfig.put("dummy-task-property", "yes"); + herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, createCallback); Review Comment: This re-uses the createCallback, which should already be resolved from the earlier request. I think we need a new future to wait for, like testPutConnectorConfig. When I made that change, the calback wasn't completing because this test was missing a mock for `Worker#startConnector`. There's a mocking idiom used elsewhere in the test that you can copy: https://github.com/apache/kafka/blob/e6f2624c48ceab032811693e1013b70c6ee16c74/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java#L250-L255 After that is fixed, I think some of the other assertions change slightly, since the second put isn't actually executing to completion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org