Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-12 Thread via GitHub


gharris1727 merged PR #15080:
URL: https://github.com/apache/kafka/pull/15080


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on PR #15080:
URL: https://github.com/apache/kafka/pull/15080#issuecomment-1890150013

   Test failures appear unrelated, and the tests pass locally for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-12 Thread via GitHub


gharris1727 commented on code in PR #15080:
URL: https://github.com/apache/kafka/pull/15080#discussion_r1450714324


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -1001,6 +1001,65 @@ public void testResetConnectorOffsets() throws Exception 
{
 assertEquals(msg, resetOffsetsCallback.get(1000, 
TimeUnit.MILLISECONDS));
 }
 
+@Test()
+public void testRequestTaskReconfigurationDoesNotDeadlock() throws 
Exception {
+connector = mock(BogusSourceConnector.class);
+expectAdd(SourceSink.SOURCE);
+
+// Start the connector
+Map config = connectorConfig(SourceSink.SOURCE);
+Connector connectorMock = mock(SourceConnector.class);
+expectConfigValidation(connectorMock, true, config);
+
+herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+
+// Wait on connector to start
+Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
+assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+
+// Prepare for task config update
+
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+expectStop();
+
+// Prepare for connector and task config update
+Map newConfig = connectorConfig(SourceSink.SOURCE);
+newConfig.put("dummy-connector-property", "yes");
+final ArgumentCaptor> onStart = 
ArgumentCaptor.forClass(Callback.class);
+doAnswer(invocation -> {
+onStart.getValue().onCompletion(null, TargetState.STARTED);
+return true;
+}).when(worker).startConnector(eq(CONNECTOR_NAME), eq(newConfig), 
any(HerderConnectorContext.class),
+eq(herder), eq(TargetState.STARTED), onStart.capture());
+
+// Common invocations
+when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 
0)), any(), any(), any(), eq(herder), 
eq(TargetState.STARTED))).thenReturn(true);
+Map updatedTaskConfig1 = taskConfig(SourceSink.SOURCE);
+updatedTaskConfig1.put("dummy-task-property", "1");
+Map updatedTaskConfig2 = taskConfig(SourceSink.SOURCE);
+updatedTaskConfig2.put("dummy-task-property", "2");
+when(worker.connectorTaskConfigs(eq(CONNECTOR_NAME), any()))
+.thenReturn(
+Collections.singletonList(updatedTaskConfig1),
+Collections.singletonList(updatedTaskConfig2));
+
+// Set new config on the connector and tasks
+FutureCallback> reconfigureCallback = 
new FutureCallback<>();
+expectConfigValidation(connectorMock, false, newConfig);
+herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, 
reconfigureCallback);
+
+Thread.sleep(10);
+

Review Comment:
   Does this make the deadlock more likely on your machine? I can't seem to 
reproduce it with or without this sleep.
   Since it's going to be highly dependent on the machine, I think we should 
just eliminate this sleep.
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-12 Thread via GitHub


developster commented on PR #15080:
URL: https://github.com/apache/kafka/pull/15080#issuecomment-117897

   I've changed the mocks a bit to accommodate both cases, depending on who 
executes first: putConnectorConfig or requestTaskReconfiguration. The sleep(10) 
is the right amount to wait on my machine to reproduce the deadlock 
consistently. It should be probably fine for this test, if not, I can remove it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-12 Thread via GitHub


developster commented on code in PR #15080:
URL: https://github.com/apache/kafka/pull/15080#discussion_r1450160994


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -1001,6 +1001,48 @@ public void testResetConnectorOffsets() throws Exception 
{
 assertEquals(msg, resetOffsetsCallback.get(1000, 
TimeUnit.MILLISECONDS));
 }
 
+@Test()
+public void testRequestTaskReconfigurationDoesNotDeadlock() throws 
Exception {
+connector = mock(BogusSourceConnector.class);
+expectAdd(SourceSink.SOURCE);
+
+// Start the connector
+Map config = connectorConfig(SourceSink.SOURCE);
+Connector connectorMock = mock(SourceConnector.class);
+expectConfigValidation(connectorMock, true, config);
+
+herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+
+// Wait on connector to start
+Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
+assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+
+// Updated task with new config
+Map updatedTaskConfig = taskConfig(SourceSink.SOURCE);
+updatedTaskConfig.put("dummy-task-property", "yes");
+when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, true)))
+.thenReturn(Collections.singletonList(updatedTaskConfig));
+
+// Expect that tasks will be stopped and started again
+
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+doNothing().when(worker).stopAndAwaitTasks(singletonList(new 
ConnectorTaskId(CONNECTOR_NAME, 0)));
+when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 
0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(updatedTaskConfig), 
eq(herder), eq(TargetState.STARTED))).thenReturn(true);
+
+// Set new connector config
+Map newConfig = connectorConfig(SourceSink.SOURCE);
+newConfig.put("dummy-task-property", "yes");
+herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, 
createCallback);

Review Comment:
   You are right. Updated the test to set a new config on connector and task in 
the second putConnectorConfig and made sure that task config is updated in 
requestTaskReconfiguration. The idea is that in this case both execution paths 
should invoke store.putTaskConfigs and this happens only if task config changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-04 Thread via GitHub


gharris1727 commented on code in PR #15080:
URL: https://github.com/apache/kafka/pull/15080#discussion_r1442146732


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -1001,6 +1001,48 @@ public void testResetConnectorOffsets() throws Exception 
{
 assertEquals(msg, resetOffsetsCallback.get(1000, 
TimeUnit.MILLISECONDS));
 }
 
+@Test()
+public void testRequestTaskReconfigurationDoesNotDeadlock() throws 
Exception {
+connector = mock(BogusSourceConnector.class);
+expectAdd(SourceSink.SOURCE);
+
+// Start the connector
+Map config = connectorConfig(SourceSink.SOURCE);
+Connector connectorMock = mock(SourceConnector.class);
+expectConfigValidation(connectorMock, true, config);
+
+herder.putConnectorConfig(CONNECTOR_NAME, config, false, 
createCallback);
+
+// Wait on connector to start
+Herder.Created connectorInfo = 
createCallback.get(1000L, TimeUnit.MILLISECONDS);
+assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+
+// Updated task with new config
+Map updatedTaskConfig = taskConfig(SourceSink.SOURCE);
+updatedTaskConfig.put("dummy-task-property", "yes");
+when(worker.connectorTaskConfigs(CONNECTOR_NAME, new 
SourceConnectorConfig(plugins, config, true)))
+.thenReturn(Collections.singletonList(updatedTaskConfig));
+
+// Expect that tasks will be stopped and started again
+
when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+doNothing().when(worker).stopAndAwaitTasks(singletonList(new 
ConnectorTaskId(CONNECTOR_NAME, 0)));
+when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 
0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(updatedTaskConfig), 
eq(herder), eq(TargetState.STARTED))).thenReturn(true);
+
+// Set new connector config
+Map newConfig = connectorConfig(SourceSink.SOURCE);
+newConfig.put("dummy-task-property", "yes");
+herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, 
createCallback);

Review Comment:
   This re-uses the createCallback, which should already be resolved from the 
earlier request. I think we need a new future to wait for, like 
testPutConnectorConfig. 
   
   When I made that change, the calback wasn't completing because this test was 
missing a mock for `Worker#startConnector`. There's a mocking idiom used 
elsewhere in the test that you can copy: 
https://github.com/apache/kafka/blob/e6f2624c48ceab032811693e1013b70c6ee16c74/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java#L250-L255
 
   
   After that is fixed, I think some of the other assertions change slightly, 
since the second put isn't actually executing to completion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-04 Thread via GitHub


developster commented on PR #15080:
URL: https://github.com/apache/kafka/pull/15080#issuecomment-1877282686

   @gharris1727 , I've added the test. It first starts the connector, then 
updates both the connector and task at the same time. The reason for this is 
that task reconfiguration will skip if the connector is not yet started. Hope 
it looks right.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-03 Thread via GitHub


gharris1727 commented on PR #15080:
URL: https://github.com/apache/kafka/pull/15080#issuecomment-1875639893

   > just invoking methods in order is not enough to trigger the deadlock. I 
believe it is possible to reliably reproduce the deadlock with two countdown 
latches, one countdown in the ConfigBackingStore#snapshot and another in 
ConfigBackingStore#putTaskConfigs. This requires a mock for the config backing 
store. If you have a better idea I am happy to analyze it.
   
   Yeah I understand. I think the cost of deterministically reproducing the 
deadlock is too high. I did it in #8259 because I didn't know what 
synchronization was missing and needed a repro case to debug.
   
   I would be satisfied with a test which non-deterministically reproduces the 
deadlock but is less brittle and includes less mocks. Currently we only have 
two connectors calling task reconfiguration (mirror checkpoint and source) and 
one test in the DistributedHerder. There is zero coverage in StandaloneHerder, 
which is part of why we never found this bug :)
   
   > Unrelated to this PR's issue, it may be that the wait operations in 
StandaloneHerderTest are by mistake 1000 seconds instead of milliseconds. Isn't 
it?
   
   Yeah that timeout seems a bit absurd, but if there aren't deadlocks in the 
test it should never incur that timeout. It looks like the test suite is very 
well behaved in practice, so I'm inclined to keep it as-is: 
https://ge.apache.org/scans/tests?search.names=Git%20branch=P90D=kafka=America%2FLos_Angeles=trunk=*StandaloneHerderTest=FLAKY


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2024-01-03 Thread via GitHub


developster commented on PR #15080:
URL: https://github.com/apache/kafka/pull/15080#issuecomment-1875320392

   @gharris1727 , sure. I wrote the test but just invoking methods in order is 
not enough to trigger the deadlock. The requestTaskReconfiguration method is 
always executed before putConnectorConfig.
   
   I believe it is possible to reliably reproduce the deadlock with two 
countdown latches, one countdown in the `ConfigBackingStore#snapshot` and 
another in `ConfigBackingStore#putTaskConfigs`. This requires a mock for the 
config backing store. If you have a better idea I am happy to analyze it.
   
   Unrelated to this PR's issue, it may be that the wait operations in 
StandaloneHerderTest are by mistake 1000 seconds instead of milliseconds. Isn't 
it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]

2023-12-29 Thread via GitHub


developster commented on PR #15080:
URL: https://github.com/apache/kafka/pull/15080#issuecomment-1871963622

   Sure @vamossagar12 , just tested both unpatched 3.6.1 and patched 3.8.0. 
Unpatched version reached deadlock in 40% of the cases (6 out of 15) at the 
same time I was unable to reproduce deadlock in the patched version (0 out of 
20).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org