Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
gharris1727 merged PR #15080: URL: https://github.com/apache/kafka/pull/15080 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
gharris1727 commented on PR #15080: URL: https://github.com/apache/kafka/pull/15080#issuecomment-1890150013 Test failures appear unrelated, and the tests pass locally for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
gharris1727 commented on code in PR #15080: URL: https://github.com/apache/kafka/pull/15080#discussion_r1450714324 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -1001,6 +1001,65 @@ public void testResetConnectorOffsets() throws Exception { assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); } +@Test() +public void testRequestTaskReconfigurationDoesNotDeadlock() throws Exception { +connector = mock(BogusSourceConnector.class); +expectAdd(SourceSink.SOURCE); + +// Start the connector +Map config = connectorConfig(SourceSink.SOURCE); +Connector connectorMock = mock(SourceConnector.class); +expectConfigValidation(connectorMock, true, config); + +herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); + +// Wait on connector to start +Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS); +assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); + +// Prepare for task config update + when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); +expectStop(); + +// Prepare for connector and task config update +Map newConfig = connectorConfig(SourceSink.SOURCE); +newConfig.put("dummy-connector-property", "yes"); +final ArgumentCaptor> onStart = ArgumentCaptor.forClass(Callback.class); +doAnswer(invocation -> { +onStart.getValue().onCompletion(null, TargetState.STARTED); +return true; +}).when(worker).startConnector(eq(CONNECTOR_NAME), eq(newConfig), any(HerderConnectorContext.class), +eq(herder), eq(TargetState.STARTED), onStart.capture()); + +// Common invocations +when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true); +Map updatedTaskConfig1 = taskConfig(SourceSink.SOURCE); +updatedTaskConfig1.put("dummy-task-property", "1"); +Map updatedTaskConfig2 = taskConfig(SourceSink.SOURCE); +updatedTaskConfig2.put("dummy-task-property", "2"); +when(worker.connectorTaskConfigs(eq(CONNECTOR_NAME), any())) +.thenReturn( +Collections.singletonList(updatedTaskConfig1), +Collections.singletonList(updatedTaskConfig2)); + +// Set new config on the connector and tasks +FutureCallback> reconfigureCallback = new FutureCallback<>(); +expectConfigValidation(connectorMock, false, newConfig); +herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, reconfigureCallback); + +Thread.sleep(10); + Review Comment: Does this make the deadlock more likely on your machine? I can't seem to reproduce it with or without this sleep. Since it's going to be highly dependent on the machine, I think we should just eliminate this sleep. ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
developster commented on PR #15080: URL: https://github.com/apache/kafka/pull/15080#issuecomment-117897 I've changed the mocks a bit to accommodate both cases, depending on who executes first: putConnectorConfig or requestTaskReconfiguration. The sleep(10) is the right amount to wait on my machine to reproduce the deadlock consistently. It should be probably fine for this test, if not, I can remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
developster commented on code in PR #15080: URL: https://github.com/apache/kafka/pull/15080#discussion_r1450160994 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -1001,6 +1001,48 @@ public void testResetConnectorOffsets() throws Exception { assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); } +@Test() +public void testRequestTaskReconfigurationDoesNotDeadlock() throws Exception { +connector = mock(BogusSourceConnector.class); +expectAdd(SourceSink.SOURCE); + +// Start the connector +Map config = connectorConfig(SourceSink.SOURCE); +Connector connectorMock = mock(SourceConnector.class); +expectConfigValidation(connectorMock, true, config); + +herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); + +// Wait on connector to start +Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS); +assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); + +// Updated task with new config +Map updatedTaskConfig = taskConfig(SourceSink.SOURCE); +updatedTaskConfig.put("dummy-task-property", "yes"); +when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))) +.thenReturn(Collections.singletonList(updatedTaskConfig)); + +// Expect that tasks will be stopped and started again + when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); +doNothing().when(worker).stopAndAwaitTasks(singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0))); +when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(updatedTaskConfig), eq(herder), eq(TargetState.STARTED))).thenReturn(true); + +// Set new connector config +Map newConfig = connectorConfig(SourceSink.SOURCE); +newConfig.put("dummy-task-property", "yes"); +herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, createCallback); Review Comment: You are right. Updated the test to set a new config on connector and task in the second putConnectorConfig and made sure that task config is updated in requestTaskReconfiguration. The idea is that in this case both execution paths should invoke store.putTaskConfigs and this happens only if task config changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
gharris1727 commented on code in PR #15080: URL: https://github.com/apache/kafka/pull/15080#discussion_r1442146732 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -1001,6 +1001,48 @@ public void testResetConnectorOffsets() throws Exception { assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS)); } +@Test() +public void testRequestTaskReconfigurationDoesNotDeadlock() throws Exception { +connector = mock(BogusSourceConnector.class); +expectAdd(SourceSink.SOURCE); + +// Start the connector +Map config = connectorConfig(SourceSink.SOURCE); +Connector connectorMock = mock(SourceConnector.class); +expectConfigValidation(connectorMock, true, config); + +herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback); + +// Wait on connector to start +Herder.Created connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS); +assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result()); + +// Updated task with new config +Map updatedTaskConfig = taskConfig(SourceSink.SOURCE); +updatedTaskConfig.put("dummy-task-property", "yes"); +when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, config, true))) +.thenReturn(Collections.singletonList(updatedTaskConfig)); + +// Expect that tasks will be stopped and started again + when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME)); +doNothing().when(worker).stopAndAwaitTasks(singletonList(new ConnectorTaskId(CONNECTOR_NAME, 0))); +when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), any(), eq(connectorConfig(SourceSink.SOURCE)), eq(updatedTaskConfig), eq(herder), eq(TargetState.STARTED))).thenReturn(true); + +// Set new connector config +Map newConfig = connectorConfig(SourceSink.SOURCE); +newConfig.put("dummy-task-property", "yes"); +herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, createCallback); Review Comment: This re-uses the createCallback, which should already be resolved from the earlier request. I think we need a new future to wait for, like testPutConnectorConfig. When I made that change, the calback wasn't completing because this test was missing a mock for `Worker#startConnector`. There's a mocking idiom used elsewhere in the test that you can copy: https://github.com/apache/kafka/blob/e6f2624c48ceab032811693e1013b70c6ee16c74/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java#L250-L255 After that is fixed, I think some of the other assertions change slightly, since the second put isn't actually executing to completion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
developster commented on PR #15080: URL: https://github.com/apache/kafka/pull/15080#issuecomment-1877282686 @gharris1727 , I've added the test. It first starts the connector, then updates both the connector and task at the same time. The reason for this is that task reconfiguration will skip if the connector is not yet started. Hope it looks right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
gharris1727 commented on PR #15080: URL: https://github.com/apache/kafka/pull/15080#issuecomment-1875639893 > just invoking methods in order is not enough to trigger the deadlock. I believe it is possible to reliably reproduce the deadlock with two countdown latches, one countdown in the ConfigBackingStore#snapshot and another in ConfigBackingStore#putTaskConfigs. This requires a mock for the config backing store. If you have a better idea I am happy to analyze it. Yeah I understand. I think the cost of deterministically reproducing the deadlock is too high. I did it in #8259 because I didn't know what synchronization was missing and needed a repro case to debug. I would be satisfied with a test which non-deterministically reproduces the deadlock but is less brittle and includes less mocks. Currently we only have two connectors calling task reconfiguration (mirror checkpoint and source) and one test in the DistributedHerder. There is zero coverage in StandaloneHerder, which is part of why we never found this bug :) > Unrelated to this PR's issue, it may be that the wait operations in StandaloneHerderTest are by mistake 1000 seconds instead of milliseconds. Isn't it? Yeah that timeout seems a bit absurd, but if there aren't deadlocks in the test it should never incur that timeout. It looks like the test suite is very well behaved in practice, so I'm inclined to keep it as-is: https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P90D&search.rootProjectNames=kafka&search.timeZoneId=America%2FLos_Angeles&search.values=trunk&tests.container=*StandaloneHerderTest&tests.sortField=FLAKY -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
developster commented on PR #15080: URL: https://github.com/apache/kafka/pull/15080#issuecomment-1875320392 @gharris1727 , sure. I wrote the test but just invoking methods in order is not enough to trigger the deadlock. The requestTaskReconfiguration method is always executed before putConnectorConfig. I believe it is possible to reliably reproduce the deadlock with two countdown latches, one countdown in the `ConfigBackingStore#snapshot` and another in `ConfigBackingStore#putTaskConfigs`. This requires a mock for the config backing store. If you have a better idea I am happy to analyze it. Unrelated to this PR's issue, it may be that the wait operations in StandaloneHerderTest are by mistake 1000 seconds instead of milliseconds. Isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
developster commented on PR #15080: URL: https://github.com/apache/kafka/pull/15080#issuecomment-1871963622 Sure @vamossagar12 , just tested both unpatched 3.6.1 and patched 3.8.0. Unpatched version reached deadlock in 40% of the cases (6 out of 15) at the same time I was unable to reproduce deadlock in the patched version (0 out of 20). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16051: Fixed deadlock in StandaloneHerder [kafka]
developster opened a new pull request, #15080: URL: https://github.com/apache/kafka/pull/15080 *Description of the change* Changed StandaloneHerder to always synchronize on itself before invoking any methods on MemoryConfigBackingStore. This helped the situation as the order of acquiring locks is always the same. First on the herder and then on the config backing store. *Testing strategy* Manually tested StandaloneHerder from Kafka 2.6.3 and Kafka 3.6.1 and both have the same issue. 9 times out of 10 startup ends in deadlock. Tested after fixing (3.8.0-SNAPSHOT) and the deadlock is not happening anymore. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org