gharris1727 commented on code in PR #15080:
URL: https://github.com/apache/kafka/pull/15080#discussion_r1450714324


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -1001,6 +1001,65 @@ public void testResetConnectorOffsets() throws Exception 
{
         assertEquals(msg, resetOffsetsCallback.get(1000, 
TimeUnit.MILLISECONDS));
     }
 
+    @Test()
+    public void testRequestTaskReconfigurationDoesNotDeadlock() throws 
Exception {
+        connector = mock(BogusSourceConnector.class);
+        expectAdd(SourceSink.SOURCE);
+
+        // Start the connector
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        Connector connectorMock = mock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
+
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+
+        // Wait on connector to start
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
+        assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+
+        // Prepare for task config update
+        
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+        expectStop();
+
+        // Prepare for connector and task config update
+        Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE);
+        newConfig.put("dummy-connector-property", "yes");
+        final ArgumentCaptor<Callback<TargetState>> onStart = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            onStart.getValue().onCompletion(null, TargetState.STARTED);
+            return true;
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(newConfig), 
any(HerderConnectorContext.class),
+                eq(herder), eq(TargetState.STARTED), onStart.capture());
+
+        // Common invocations
+        when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 
0)), any(), any(), any(), eq(herder), 
eq(TargetState.STARTED))).thenReturn(true);
+        Map<String, String> updatedTaskConfig1 = taskConfig(SourceSink.SOURCE);
+        updatedTaskConfig1.put("dummy-task-property", "1");
+        Map<String, String> updatedTaskConfig2 = taskConfig(SourceSink.SOURCE);
+        updatedTaskConfig2.put("dummy-task-property", "2");
+        when(worker.connectorTaskConfigs(eq(CONNECTOR_NAME), any()))
+                .thenReturn(
+                        Collections.singletonList(updatedTaskConfig1),
+                        Collections.singletonList(updatedTaskConfig2));
+
+        // Set new config on the connector and tasks
+        FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = 
new FutureCallback<>();
+        expectConfigValidation(connectorMock, false, newConfig);
+        herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, 
reconfigureCallback);
+
+        Thread.sleep(10);
+

Review Comment:
   Does this make the deadlock more likely on your machine? I can't seem to 
reproduce it with or without this sleep.
   Since it's going to be highly dependent on the machine, I think we should 
just eliminate this sleep.
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to