gharris1727 commented on code in PR #15080:
URL: https://github.com/apache/kafka/pull/15080#discussion_r1442146732


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -1001,6 +1001,48 @@ public void testResetConnectorOffsets() throws Exception 
{
         assertEquals(msg, resetOffsetsCallback.get(1000, 
TimeUnit.MILLISECONDS));
     }
 
+    @Test()
+    public void testRequestTaskReconfigurationDoesNotDeadlock() throws 
Exception {
+        connector = mock(BogusSourceConnector.class);
+        expectAdd(SourceSink.SOURCE);
+
+        // Start the connector
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        Connector connectorMock = mock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
+
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+
+        // Wait on connector to start
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
+        assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+
+        // Updated task with new config
+        Map<String, String> updatedTaskConfig = taskConfig(SourceSink.SOURCE);
+        updatedTaskConfig.put("dummy-task-property", "yes");
+        when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, true)))
+                .thenReturn(Collections.singletonList(updatedTaskConfig));
+
+        // Expect that tasks will be stopped and started again
+        
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+        doNothing().when(worker).stopAndAwaitTasks(singletonList(new 
ConnectorTaskId(CONNECTOR_NAME, 0)));
+        when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 
0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(updatedTaskConfig), 
eq(herder), eq(TargetState.STARTED))).thenReturn(true);
+
+        // Set new connector config
+        Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE);
+        newConfig.put("dummy-task-property", "yes");
+        herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, 
createCallback);

Review Comment:
   This re-uses the createCallback, which should already be resolved from the 
earlier request. I think we need a new future to wait for, like 
testPutConnectorConfig. 
   
   When I made that change, the calback wasn't completing because this test was 
missing a mock for `Worker#startConnector`. There's a mocking idiom used 
elsewhere in the test that you can copy: 
https://github.com/apache/kafka/blob/e6f2624c48ceab032811693e1013b70c6ee16c74/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java#L250-L255
 
   
   After that is fixed, I think some of the other assertions change slightly, 
since the second put isn't actually executing to completion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to