developster commented on code in PR #15080:
URL: https://github.com/apache/kafka/pull/15080#discussion_r1450160994


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -1001,6 +1001,48 @@ public void testResetConnectorOffsets() throws Exception 
{
         assertEquals(msg, resetOffsetsCallback.get(1000, 
TimeUnit.MILLISECONDS));
     }
 
+    @Test()
+    public void testRequestTaskReconfigurationDoesNotDeadlock() throws 
Exception {
+        connector = mock(BogusSourceConnector.class);
+        expectAdd(SourceSink.SOURCE);
+
+        // Start the connector
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        Connector connectorMock = mock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
+
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+
+        // Wait on connector to start
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
+        assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+
+        // Updated task with new config
+        Map<String, String> updatedTaskConfig = taskConfig(SourceSink.SOURCE);
+        updatedTaskConfig.put("dummy-task-property", "yes");
+        when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, true)))
+                .thenReturn(Collections.singletonList(updatedTaskConfig));
+
+        // Expect that tasks will be stopped and started again
+        
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+        doNothing().when(worker).stopAndAwaitTasks(singletonList(new 
ConnectorTaskId(CONNECTOR_NAME, 0)));
+        when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 
0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(updatedTaskConfig), 
eq(herder), eq(TargetState.STARTED))).thenReturn(true);
+
+        // Set new connector config
+        Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE);
+        newConfig.put("dummy-task-property", "yes");
+        herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, 
createCallback);

Review Comment:
   You are right. Updated the test to set a new config on connector and task in 
the second putConnectorConfig and made sure that task config is updated in 
requestTaskReconfiguration. The idea is that in this case both execution paths 
should invoke store.putTaskConfigs and this happens only if task config changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to