developster commented on code in PR #15080: URL: https://github.com/apache/kafka/pull/15080#discussion_r1450160994
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -1001,6 +1001,48 @@ public void testResetConnectorOffsets() throws Exception { assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); } + @Test() + public void testRequestTaskReconfigurationDoesNotDeadlock() throws Exception { + connector = mock(BogusSourceConnector.class); + expectAdd(SourceSink.SOURCE); + + // Start the connector + Map<String, String> config = connectorConfig(SourceSink.SOURCE); + Connector connectorMock = mock(SourceConnector.class); + expectConfigValidation(connectorMock, true, config); + + herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); + + // Wait on connector to start + Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS); + assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); + + // Updated task with new config + Map<String, String> updatedTaskConfig = taskConfig(SourceSink.SOURCE); + updatedTaskConfig.put("dummy-task-property", "yes"); + when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))) + .thenReturn(Collections.singletonList(updatedTaskConfig)); + + // Expect that tasks will be stopped and started again + when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); + doNothing().when(worker).stopAndAwaitTasks(singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0))); + when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(updatedTaskConfig), eq(herder), eq(TargetState.STARTED))).thenReturn(true); + + // Set new connector config + Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE); + newConfig.put("dummy-task-property", "yes"); + herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, createCallback); Review Comment: You are right. Updated the test to set a new config on connector and task in the second putConnectorConfig and made sure that task config is updated in requestTaskReconfiguration. The idea is that in this case both execution paths should invoke store.putTaskConfigs and this happens only if task config changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org