C0urante commented on code in PR #6934: URL: https://github.com/apache/kafka/pull/6934#discussion_r1568959278
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ########## @@ -242,6 +242,19 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto return response.entity(createdInfo.result()).build(); } + @PATCH + @Path("/{connector}/config") + public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, + final Map<String, String> connectorConfigPatch) throws Throwable { + FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); + herder.patchConnectorConfig(connector, connectorConfigPatch, cb); + Herder.Created<ConnectorInfo> createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", + "PATCH", headers, connectorConfigPatch, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward); + return Response.ok().entity(createdInfo.result()).build(); Review Comment: Just realizing now that we don't actually specify the status and body of the REST response in the KIP. I agree with what's here, especially since it matches the existing `PUT /connectors/{name}/config` endpoint, but it's worth specifying in the KIP for completeness. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } + @Test + public void testPatchConnectorConfigNotFound() { + ClusterConfigState clusterConfigState = new ClusterConfigState( + 0, + null, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet()); + expectConfigRefreshAndSnapshot(clusterConfigState); + + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "baz1"); + + FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>(); + herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback); + herder.tick(); + assertTrue(patchCallback.isDone()); + ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get); + assertInstanceOf(NotFoundException.class, exception.getCause()); + } + + @Test + public void testPatchConnectorConfig() throws Exception { + when(member.memberId()).thenReturn("leader"); + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + + Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG); + originalConnConfig.put("foo1", "bar1"); + originalConnConfig.put("foo2", "bar2"); Review Comment: Nit: can we add one more key/value pair that should be unchanged after the patch is applied? This would catch bugs where the set of post-patch keys is derived from the patch instead of the combination of the patch and the prior configuration. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ########## @@ -242,6 +242,19 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto return response.entity(createdInfo.result()).build(); } + @PATCH + @Path("/{connector}/config") + public Response patchConnectorConfig(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, Review Comment: Nit: ```suggestion final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(connectorConfigCb); } + @Test + public void testPatchConnectorConfigNotFound() { + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "baz1"); + + Callback<Herder.Created<ConnectorInfo>> patchCallback = mock(Callback.class); + herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + + ArgumentCaptor<NotFoundException> exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class); + verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull()); + assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found"); + assertNull(exceptionCaptor.getValue().getCause()); + } + + @Test + public void testPatchConnectorConfig() throws ExecutionException, InterruptedException, TimeoutException { + // Create the connector. + Map<String, String> originalConnConfig = connectorConfig(SourceSink.SOURCE); + originalConnConfig.put("foo1", "bar1"); + originalConnConfig.put("foo2", "bar2"); + + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "changed"); + connConfigPatch.put("foo2", null); + connConfigPatch.put("foo3", "added"); + + Map<String, String> patchedConnConfig = new HashMap<>(originalConnConfig); + patchedConnConfig.put("foo1", "changed"); + patchedConnConfig.remove("foo2"); + patchedConnConfig.put("foo3", "added"); + + expectAdd(SourceSink.SOURCE); + Connector connectorMock = mock(SourceConnector.class); + expectConfigValidation(SourceSink.SOURCE, originalConnConfig, patchedConnConfig); + + expectConnectorStartingWithoutTasks(originalConnConfig); + + herder.putConnectorConfig(CONNECTOR_NAME, originalConnConfig, false, createCallback); + createCallback.get(1000L, TimeUnit.SECONDS); + + expectConnectorStartingWithoutTasks(patchedConnConfig); + + FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>(); + herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + + Map<String, String> returnedConfig = patchCallback.get(1000L, TimeUnit.SECONDS).result().config(); + assertEquals(patchedConnConfig, returnedConfig); + + // Also check the returned config when requested. + FutureCallback<Map<String, String>> configCallback = new FutureCallback<>(); + herder.connectorConfig(CONNECTOR_NAME, configCallback); + + Map<String, String> returnedConfig2 = configCallback.get(1000L, TimeUnit.SECONDS); + assertEquals(patchedConnConfig, returnedConfig2); + } + + private void expectConnectorStartingWithoutTasks(Map<String, String> config) { Review Comment: Nit: just to avoid confusion if someone leverages this method in the future, could we either rename it to `expectSourceConnectorStartingWithoutTasks`, or add a `SourceSink connectorType` parameter to it that can be used to control which kind of connector is expected? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(connectorConfigCb); } + @Test + public void testPatchConnectorConfigNotFound() { + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "baz1"); + + Callback<Herder.Created<ConnectorInfo>> patchCallback = mock(Callback.class); + herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + + ArgumentCaptor<NotFoundException> exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class); + verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull()); + assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found"); + assertNull(exceptionCaptor.getValue().getCause()); + } + + @Test + public void testPatchConnectorConfig() throws ExecutionException, InterruptedException, TimeoutException { + // Create the connector. + Map<String, String> originalConnConfig = connectorConfig(SourceSink.SOURCE); + originalConnConfig.put("foo1", "bar1"); + originalConnConfig.put("foo2", "bar2"); Review Comment: Same suggestion RE adding another k/v pair that stays unchanged before/after the patch. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ########## @@ -246,6 +246,31 @@ private synchronized void putConnectorConfig(String connName, } } + @Override + public void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback) { Review Comment: Should we make this method `synchronized` in order to prevent (or at least reduce) race conditions? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -992,4 +992,15 @@ public List<String> setWorkerLoggerLevel(String namespace, String desiredLevelSt return loggers.setLevel(namespace, level); } + protected Map<String, String> applyConnectorConfigPatch(Map<String, String> currentConfig, Map<String, String> configPatch) { + Map<String, String> patchedConfig = new HashMap<>(currentConfig); + configPatch.forEach((k, v) -> { + if (v != null) { + patchedConfig.put(k, v); + } else { + patchedConfig.remove(k); + } + }); + return patchedConfig; + } Review Comment: This feels generic enough that we could probably put it into [ConnectUtils](https://github.com/apache/kafka/blob/e63efbc5d771a6e69eb8678db6f88730b4e751e1/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java) or even just [Utils](https://github.com/apache/kafka/blob/e63efbc5d771a6e69eb8678db6f88730b4e751e1/examples/src/main/java/kafka/examples/Utils.java) as something like `patchConfig` (if in the former) or `combineMaps`, `mergeMaps`, or `patchMap` (if in the latter). ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, final Map<String, String> ); } + @Override + public void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback) { + log.trace("Submitting connector config patch request {}", connName); + addRequest(() -> { + ConnectorInfo connectorInfo = connectorInfo(connName); + if (connectorInfo == null) { + callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); + } else { + Map<String, String> patchedConfig = applyConnectorConfigPatch(connectorInfo.config(), configPatch); + putConnectorConfig(connName, patchedConfig, true, callback); Review Comment: This adds another request to the queue instead of executing the follow-up logic immediately, which increases the chance for race conditions. Can we fix that? We could possibly have a synchronous variant of `putConnectorConfig` whose method body is pulled from [here](https://github.com/apache/kafka/blob/e63efbc5d771a6e69eb8678db6f88730b4e751e1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1099-L1140), and invoke that variant instead of the asynchronous one at this line in `patchConnectorConfig`. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ########## @@ -246,6 +246,31 @@ private synchronized void putConnectorConfig(String connName, } } + @Override + public void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback) { + try { + ConnectorInfo connectorInfo = connectorInfo(connName); + if (connectorInfo == null) { + callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); + return; + } + + Map<String, String> patchedConfig = applyConnectorConfigPatch(connectorInfo.config(), configPatch); + validateConnectorConfig(patchedConfig, (error, configInfos) -> { + if (error != null) { + callback.onCompletion(error, null); + return; + } + + requestExecutorService.submit( + () -> putConnectorConfig(connName, patchedConfig, null, true, callback, configInfos) + ); + }); + } catch (ConnectException e) { Review Comment: I don't know why we limit the catch clause to `ConnectException` in `deleteConnectorConfig`, but here (and probably there as well) it should be `Throwable` instead; otherwise, the callback is left dangling. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } + @Test + public void testPatchConnectorConfigNotFound() { + ClusterConfigState clusterConfigState = new ClusterConfigState( + 0, + null, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet()); + expectConfigRefreshAndSnapshot(clusterConfigState); + + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "baz1"); + + FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>(); + herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback); + herder.tick(); + assertTrue(patchCallback.isDone()); + ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get); + assertInstanceOf(NotFoundException.class, exception.getCause()); + } + + @Test + public void testPatchConnectorConfig() throws Exception { + when(member.memberId()).thenReturn("leader"); + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + + Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG); + originalConnConfig.put("foo1", "bar1"); + originalConnConfig.put("foo2", "bar2"); + + // The connector is pre-existing due to the mocks. + + ClusterConfigState originalSnapshot = new ClusterConfigState( + 1, + null, + Collections.singletonMap(CONN1, 0), + Collections.singletonMap(CONN1, originalConnConfig), + Collections.singletonMap(CONN1, TargetState.STARTED), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet()); + expectConfigRefreshAndSnapshot(originalSnapshot); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); + + expectMemberPoll(); + + // Patch the connector config. + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "changed"); + connConfigPatch.put("foo2", null); + connConfigPatch.put("foo3", "added"); + + Map<String, String> patchedConnConfig = new HashMap<>(originalConnConfig); + patchedConnConfig.put("foo1", "changed"); + patchedConnConfig.remove("foo2"); + patchedConnConfig.put("foo3", "added"); + + expectMemberEnsureActive(); + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(worker.getPlugins()).thenReturn(plugins); Review Comment: This part is unused, which causes the `DistributedHerder` test suite to fail when run as a whole thanks to `@RunWith(MockitoJUnitRunner.StrictStubs.class)`. We can and should remove it: ```suggestion ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, final Map<String, String> ); } + @Override + public void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback) { + log.trace("Submitting connector config patch request {}", connName); + addRequest(() -> { Review Comment: Can we do a check to make sure we're the leader at the beginning of the request body here? This would reduce (but not completely eliminate) the chance for race conditions. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(connectorConfigCb); } + @Test + public void testPatchConnectorConfigNotFound() { + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "baz1"); + + Callback<Herder.Created<ConnectorInfo>> patchCallback = mock(Callback.class); + herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + + ArgumentCaptor<NotFoundException> exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class); + verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull()); + assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found"); + assertNull(exceptionCaptor.getValue().getCause()); Review Comment: Just curious, what was the rationale for this assertion? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ########## @@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(connectorConfigCb); } + @Test + public void testPatchConnectorConfigNotFound() { + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "baz1"); + + Callback<Herder.Created<ConnectorInfo>> patchCallback = mock(Callback.class); + herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback); + + ArgumentCaptor<NotFoundException> exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class); + verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull()); + assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found"); + assertNull(exceptionCaptor.getValue().getCause()); + } + + @Test + public void testPatchConnectorConfig() throws ExecutionException, InterruptedException, TimeoutException { + // Create the connector. + Map<String, String> originalConnConfig = connectorConfig(SourceSink.SOURCE); + originalConnConfig.put("foo1", "bar1"); + originalConnConfig.put("foo2", "bar2"); + + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "changed"); + connConfigPatch.put("foo2", null); + connConfigPatch.put("foo3", "added"); + + Map<String, String> patchedConnConfig = new HashMap<>(originalConnConfig); + patchedConnConfig.put("foo1", "changed"); + patchedConnConfig.remove("foo2"); + patchedConnConfig.put("foo3", "added"); + + expectAdd(SourceSink.SOURCE); + Connector connectorMock = mock(SourceConnector.class); Review Comment: Unused variable? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception { verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } + @Test + public void testPatchConnectorConfigNotFound() { + ClusterConfigState clusterConfigState = new ClusterConfigState( + 0, + null, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet()); + expectConfigRefreshAndSnapshot(clusterConfigState); + + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "baz1"); + + FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>(); + herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback); + herder.tick(); + assertTrue(patchCallback.isDone()); + ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get); + assertInstanceOf(NotFoundException.class, exception.getCause()); + } + + @Test + public void testPatchConnectorConfig() throws Exception { + when(member.memberId()).thenReturn("leader"); + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + + Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG); + originalConnConfig.put("foo1", "bar1"); + originalConnConfig.put("foo2", "bar2"); + + // The connector is pre-existing due to the mocks. + + ClusterConfigState originalSnapshot = new ClusterConfigState( + 1, + null, + Collections.singletonMap(CONN1, 0), + Collections.singletonMap(CONN1, originalConnConfig), + Collections.singletonMap(CONN1, TargetState.STARTED), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet()); + expectConfigRefreshAndSnapshot(originalSnapshot); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); + + expectMemberPoll(); + + // Patch the connector config. + Map<String, String> connConfigPatch = new HashMap<>(); + connConfigPatch.put("foo1", "changed"); + connConfigPatch.put("foo2", null); + connConfigPatch.put("foo3", "added"); + + Map<String, String> patchedConnConfig = new HashMap<>(originalConnConfig); + patchedConnConfig.put("foo1", "changed"); + patchedConnConfig.remove("foo2"); + patchedConnConfig.put("foo3", "added"); + + expectMemberEnsureActive(); + expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); + when(worker.getPlugins()).thenReturn(plugins); + + ArgumentCaptor<Callback<ConfigInfos>> validateCallback = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { + validateCallback.getValue().onCompletion(null, CONN1_CONFIG_INFOS); + return null; + }).when(herder).validateConnectorConfig(eq(patchedConnConfig), validateCallback.capture()); + + // This is effectively the main check of this test: + // we validate that what's written in the config storage is the patched config. + doNothing().when(configBackingStore).putConnectorConfig(eq(CONN1), eq(patchedConnConfig), isNull()); Review Comment: Nit: could we change this to an explicit `verify` invocation? ```suggestion // This is effectively the main check of this test: // we validate that what's written in the config storage is the patched config. verify(configBackingStore).putConnectorConfig(eq(CONN1), eq(patchedConnConfig), isNull()); ``` And, if we want to be extra fancy, we can make sure that no other configs were written: ```java verifyNoMoreInteractions(configBackingStore); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org