C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1568959278


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
         return response.entity(createdInfo.result()).build();
     }
 
+    @PATCH
+    @Path("/{connector}/config")
+    public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+                                         final @Context HttpHeaders headers,
+                                         final @QueryParam("forward") Boolean 
forward,
+                                         final Map<String, String> 
connectorConfigPatch) throws Throwable {
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
+        herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
+        Herder.Created<ConnectorInfo> createdInfo = 
requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config",
+                "PATCH", headers, connectorConfigPatch, new 
TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), 
forward);
+        return Response.ok().entity(createdInfo.result()).build();

Review Comment:
   Just realizing now that we don't actually specify the status and body of the 
REST response in the KIP. I agree with what's here, especially since it matches 
 the existing `PUT /connectors/{name}/config` endpoint, but it's worth 
specifying in the KIP for completeness.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
         verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
     }
 
+    @Test
+    public void testPatchConnectorConfigNotFound() {
+        ClusterConfigState clusterConfigState = new ClusterConfigState(
+                0,
+                null,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
+        expectConfigRefreshAndSnapshot(clusterConfigState);
+
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "baz1");
+
+        FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new 
FutureCallback<>();
+        herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+        herder.tick();
+        assertTrue(patchCallback.isDone());
+        ExecutionException exception = assertThrows(ExecutionException.class, 
patchCallback::get);
+        assertInstanceOf(NotFoundException.class, exception.getCause());
+    }
+
+    @Test
+    public void testPatchConnectorConfig() throws Exception {
+        when(member.memberId()).thenReturn("leader");
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+        
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+        Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG);
+        originalConnConfig.put("foo1", "bar1");
+        originalConnConfig.put("foo2", "bar2");

Review Comment:
   Nit: can we add one more key/value pair that should be unchanged after the 
patch is applied? This would catch bugs where the set of post-patch keys is 
derived from the patch instead of the combination of the patch and the prior 
configuration.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
         return response.entity(createdInfo.result()).build();
     }
 
+    @PATCH
+    @Path("/{connector}/config")
+    public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+                                         final @Context HttpHeaders headers,
+                                         final @QueryParam("forward") Boolean 
forward,

Review Comment:
   Nit:
   ```suggestion
                                            final @Parameter(hidden = true) 
@QueryParam("forward") Boolean forward,
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
         verifyNoMoreInteractions(connectorConfigCb);
     }
 
+    @Test
+    public void testPatchConnectorConfigNotFound() {
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "baz1");
+
+        Callback<Herder.Created<ConnectorInfo>> patchCallback = 
mock(Callback.class);
+        herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 
patchCallback);
+
+        ArgumentCaptor<NotFoundException> exceptionCaptor = 
ArgumentCaptor.forClass(NotFoundException.class);
+        verify(patchCallback).onCompletion(exceptionCaptor.capture(), 
isNull());
+        assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + 
CONNECTOR_NAME + " not found");
+        assertNull(exceptionCaptor.getValue().getCause());
+    }
+
+    @Test
+    public void testPatchConnectorConfig() throws ExecutionException, 
InterruptedException, TimeoutException {
+        // Create the connector.
+        Map<String, String> originalConnConfig = 
connectorConfig(SourceSink.SOURCE);
+        originalConnConfig.put("foo1", "bar1");
+        originalConnConfig.put("foo2", "bar2");
+
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "changed");
+        connConfigPatch.put("foo2", null);
+        connConfigPatch.put("foo3", "added");
+
+        Map<String, String> patchedConnConfig = new 
HashMap<>(originalConnConfig);
+        patchedConnConfig.put("foo1", "changed");
+        patchedConnConfig.remove("foo2");
+        patchedConnConfig.put("foo3", "added");
+
+        expectAdd(SourceSink.SOURCE);
+        Connector connectorMock = mock(SourceConnector.class);
+        expectConfigValidation(SourceSink.SOURCE, originalConnConfig, 
patchedConnConfig);
+
+        expectConnectorStartingWithoutTasks(originalConnConfig);
+
+        herder.putConnectorConfig(CONNECTOR_NAME, originalConnConfig, false, 
createCallback);
+        createCallback.get(1000L, TimeUnit.SECONDS);
+
+        expectConnectorStartingWithoutTasks(patchedConnConfig);
+
+        FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new 
FutureCallback<>();
+        herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 
patchCallback);
+
+        Map<String, String> returnedConfig = patchCallback.get(1000L, 
TimeUnit.SECONDS).result().config();
+        assertEquals(patchedConnConfig, returnedConfig);
+
+        // Also check the returned config when requested.
+        FutureCallback<Map<String, String>> configCallback = new 
FutureCallback<>();
+        herder.connectorConfig(CONNECTOR_NAME, configCallback);
+
+        Map<String, String> returnedConfig2 = configCallback.get(1000L, 
TimeUnit.SECONDS);
+        assertEquals(patchedConnConfig, returnedConfig2);
+    }
+
+    private void expectConnectorStartingWithoutTasks(Map<String, String> 
config) {

Review Comment:
   Nit: just to avoid confusion if someone leverages this method in the future, 
could we either rename it to `expectSourceConnectorStartingWithoutTasks`, or 
add a `SourceSink connectorType` parameter to it that can be used to control 
which kind of connector is expected?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
         verifyNoMoreInteractions(connectorConfigCb);
     }
 
+    @Test
+    public void testPatchConnectorConfigNotFound() {
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "baz1");
+
+        Callback<Herder.Created<ConnectorInfo>> patchCallback = 
mock(Callback.class);
+        herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 
patchCallback);
+
+        ArgumentCaptor<NotFoundException> exceptionCaptor = 
ArgumentCaptor.forClass(NotFoundException.class);
+        verify(patchCallback).onCompletion(exceptionCaptor.capture(), 
isNull());
+        assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + 
CONNECTOR_NAME + " not found");
+        assertNull(exceptionCaptor.getValue().getCause());
+    }
+
+    @Test
+    public void testPatchConnectorConfig() throws ExecutionException, 
InterruptedException, TimeoutException {
+        // Create the connector.
+        Map<String, String> originalConnConfig = 
connectorConfig(SourceSink.SOURCE);
+        originalConnConfig.put("foo1", "bar1");
+        originalConnConfig.put("foo2", "bar2");

Review Comment:
   Same suggestion RE adding another k/v pair that stays unchanged before/after 
the patch.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -246,6 +246,31 @@ private synchronized void putConnectorConfig(String 
connName,
         }
     }
 
+    @Override
+    public void patchConnectorConfig(String connName, Map<String, String> 
configPatch, Callback<Created<ConnectorInfo>> callback) {

Review Comment:
   Should we make this method `synchronized` in order to prevent (or at least 
reduce) race conditions?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -992,4 +992,15 @@ public List<String> setWorkerLoggerLevel(String namespace, 
String desiredLevelSt
         return loggers.setLevel(namespace, level);
     }
 
+    protected Map<String, String> applyConnectorConfigPatch(Map<String, 
String> currentConfig, Map<String, String> configPatch) {
+        Map<String, String> patchedConfig = new HashMap<>(currentConfig);
+        configPatch.forEach((k, v) -> {
+            if (v != null) {
+                patchedConfig.put(k, v);
+            } else {
+                patchedConfig.remove(k);
+            }
+        });
+        return patchedConfig;
+    }

Review Comment:
   This feels generic enough that we could probably put it into 
[ConnectUtils](https://github.com/apache/kafka/blob/e63efbc5d771a6e69eb8678db6f88730b4e751e1/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java)
 or even just 
[Utils](https://github.com/apache/kafka/blob/e63efbc5d771a6e69eb8678db6f88730b4e751e1/examples/src/main/java/kafka/examples/Utils.java)
 as something like `patchConfig` (if in the former) or `combineMaps`, 
`mergeMaps`, or `patchMap` (if in the latter).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, 
final Map<String, String>
         );
     }
 
+    @Override
+    public void patchConnectorConfig(String connName, Map<String, String> 
configPatch, Callback<Created<ConnectorInfo>> callback) {
+        log.trace("Submitting connector config patch request {}", connName);
+        addRequest(() -> {
+            ConnectorInfo connectorInfo = connectorInfo(connName);
+            if (connectorInfo == null) {
+                callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found", null), null);
+            } else {
+                Map<String, String> patchedConfig = 
applyConnectorConfigPatch(connectorInfo.config(), configPatch);
+                putConnectorConfig(connName, patchedConfig, true, callback);

Review Comment:
   This adds another request to the queue instead of executing the follow-up 
logic immediately, which increases the chance for race conditions. Can we fix 
that? We could possibly have a synchronous variant of `putConnectorConfig` 
whose method body is pulled from 
[here](https://github.com/apache/kafka/blob/e63efbc5d771a6e69eb8678db6f88730b4e751e1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1099-L1140),
 and invoke that variant instead of the asynchronous one at this line in 
`patchConnectorConfig`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -246,6 +246,31 @@ private synchronized void putConnectorConfig(String 
connName,
         }
     }
 
+    @Override
+    public void patchConnectorConfig(String connName, Map<String, String> 
configPatch, Callback<Created<ConnectorInfo>> callback) {
+        try {
+            ConnectorInfo connectorInfo = connectorInfo(connName);
+            if (connectorInfo == null) {
+                callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found", null), null);
+                return;
+            }
+
+            Map<String, String> patchedConfig = 
applyConnectorConfigPatch(connectorInfo.config(), configPatch);
+            validateConnectorConfig(patchedConfig, (error, configInfos) -> {
+                if (error != null) {
+                    callback.onCompletion(error, null);
+                    return;
+                }
+
+                requestExecutorService.submit(
+                        () -> putConnectorConfig(connName, patchedConfig, 
null, true, callback, configInfos)
+                );
+            });
+        } catch (ConnectException e) {

Review Comment:
   I don't know why we limit the catch clause to `ConnectException` in 
`deleteConnectorConfig`, but here (and probably there as well) it should be 
`Throwable` instead; otherwise, the callback is left dangling.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
         verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
     }
 
+    @Test
+    public void testPatchConnectorConfigNotFound() {
+        ClusterConfigState clusterConfigState = new ClusterConfigState(
+                0,
+                null,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
+        expectConfigRefreshAndSnapshot(clusterConfigState);
+
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "baz1");
+
+        FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new 
FutureCallback<>();
+        herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+        herder.tick();
+        assertTrue(patchCallback.isDone());
+        ExecutionException exception = assertThrows(ExecutionException.class, 
patchCallback::get);
+        assertInstanceOf(NotFoundException.class, exception.getCause());
+    }
+
+    @Test
+    public void testPatchConnectorConfig() throws Exception {
+        when(member.memberId()).thenReturn("leader");
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+        
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+        Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG);
+        originalConnConfig.put("foo1", "bar1");
+        originalConnConfig.put("foo2", "bar2");
+
+        // The connector is pre-existing due to the mocks.
+
+        ClusterConfigState originalSnapshot = new ClusterConfigState(
+                1,
+                null,
+                Collections.singletonMap(CONN1, 0),
+                Collections.singletonMap(CONN1, originalConnConfig),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
+        expectConfigRefreshAndSnapshot(originalSnapshot);
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+
+        expectMemberPoll();
+
+        // Patch the connector config.
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "changed");
+        connConfigPatch.put("foo2", null);
+        connConfigPatch.put("foo3", "added");
+
+        Map<String, String> patchedConnConfig = new 
HashMap<>(originalConnConfig);
+        patchedConnConfig.put("foo1", "changed");
+        patchedConnConfig.remove("foo2");
+        patchedConnConfig.put("foo3", "added");
+
+        expectMemberEnsureActive();
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+        when(worker.getPlugins()).thenReturn(plugins);

Review Comment:
   This part is unused, which causes the `DistributedHerder` test suite to fail 
when run as a whole thanks to `@RunWith(MockitoJUnitRunner.StrictStubs.class)`. 
We can and should remove it:
   ```suggestion
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, 
final Map<String, String>
         );
     }
 
+    @Override
+    public void patchConnectorConfig(String connName, Map<String, String> 
configPatch, Callback<Created<ConnectorInfo>> callback) {
+        log.trace("Submitting connector config patch request {}", connName);
+        addRequest(() -> {

Review Comment:
   Can we do a check to make sure we're the leader at the beginning of the 
request body here? This would reduce (but not completely eliminate) the chance 
for race conditions.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
         verifyNoMoreInteractions(connectorConfigCb);
     }
 
+    @Test
+    public void testPatchConnectorConfigNotFound() {
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "baz1");
+
+        Callback<Herder.Created<ConnectorInfo>> patchCallback = 
mock(Callback.class);
+        herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 
patchCallback);
+
+        ArgumentCaptor<NotFoundException> exceptionCaptor = 
ArgumentCaptor.forClass(NotFoundException.class);
+        verify(patchCallback).onCompletion(exceptionCaptor.capture(), 
isNull());
+        assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + 
CONNECTOR_NAME + " not found");
+        assertNull(exceptionCaptor.getValue().getCause());

Review Comment:
   Just curious, what was the rationale for this assertion?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
         verifyNoMoreInteractions(connectorConfigCb);
     }
 
+    @Test
+    public void testPatchConnectorConfigNotFound() {
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "baz1");
+
+        Callback<Herder.Created<ConnectorInfo>> patchCallback = 
mock(Callback.class);
+        herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 
patchCallback);
+
+        ArgumentCaptor<NotFoundException> exceptionCaptor = 
ArgumentCaptor.forClass(NotFoundException.class);
+        verify(patchCallback).onCompletion(exceptionCaptor.capture(), 
isNull());
+        assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + 
CONNECTOR_NAME + " not found");
+        assertNull(exceptionCaptor.getValue().getCause());
+    }
+
+    @Test
+    public void testPatchConnectorConfig() throws ExecutionException, 
InterruptedException, TimeoutException {
+        // Create the connector.
+        Map<String, String> originalConnConfig = 
connectorConfig(SourceSink.SOURCE);
+        originalConnConfig.put("foo1", "bar1");
+        originalConnConfig.put("foo2", "bar2");
+
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "changed");
+        connConfigPatch.put("foo2", null);
+        connConfigPatch.put("foo3", "added");
+
+        Map<String, String> patchedConnConfig = new 
HashMap<>(originalConnConfig);
+        patchedConnConfig.put("foo1", "changed");
+        patchedConnConfig.remove("foo2");
+        patchedConnConfig.put("foo3", "added");
+
+        expectAdd(SourceSink.SOURCE);
+        Connector connectorMock = mock(SourceConnector.class);

Review Comment:
   Unused variable?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
         verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
     }
 
+    @Test
+    public void testPatchConnectorConfigNotFound() {
+        ClusterConfigState clusterConfigState = new ClusterConfigState(
+                0,
+                null,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
+        expectConfigRefreshAndSnapshot(clusterConfigState);
+
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "baz1");
+
+        FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new 
FutureCallback<>();
+        herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+        herder.tick();
+        assertTrue(patchCallback.isDone());
+        ExecutionException exception = assertThrows(ExecutionException.class, 
patchCallback::get);
+        assertInstanceOf(NotFoundException.class, exception.getCause());
+    }
+
+    @Test
+    public void testPatchConnectorConfig() throws Exception {
+        when(member.memberId()).thenReturn("leader");
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+        
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+        Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG);
+        originalConnConfig.put("foo1", "bar1");
+        originalConnConfig.put("foo2", "bar2");
+
+        // The connector is pre-existing due to the mocks.
+
+        ClusterConfigState originalSnapshot = new ClusterConfigState(
+                1,
+                null,
+                Collections.singletonMap(CONN1, 0),
+                Collections.singletonMap(CONN1, originalConnConfig),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
+        expectConfigRefreshAndSnapshot(originalSnapshot);
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+
+        expectMemberPoll();
+
+        // Patch the connector config.
+        Map<String, String> connConfigPatch = new HashMap<>();
+        connConfigPatch.put("foo1", "changed");
+        connConfigPatch.put("foo2", null);
+        connConfigPatch.put("foo3", "added");
+
+        Map<String, String> patchedConnConfig = new 
HashMap<>(originalConnConfig);
+        patchedConnConfig.put("foo1", "changed");
+        patchedConnConfig.remove("foo2");
+        patchedConnConfig.put("foo3", "added");
+
+        expectMemberEnsureActive();
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+        when(worker.getPlugins()).thenReturn(plugins);
+
+        ArgumentCaptor<Callback<ConfigInfos>> validateCallback = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            validateCallback.getValue().onCompletion(null, CONN1_CONFIG_INFOS);
+            return null;
+        }).when(herder).validateConnectorConfig(eq(patchedConnConfig), 
validateCallback.capture());
+
+        // This is effectively the main check of this test:
+        // we validate that what's written in the config storage is the 
patched config.
+        doNothing().when(configBackingStore).putConnectorConfig(eq(CONN1), 
eq(patchedConnConfig), isNull());

Review Comment:
   Nit: could we change this to an explicit `verify` invocation?
   
   ```suggestion
           // This is effectively the main check of this test:
           // we validate that what's written in the config storage is the 
patched config.
           verify(configBackingStore).putConnectorConfig(eq(CONN1), 
eq(patchedConnConfig), isNull());
   ```
   
   And, if we want to be extra fancy, we can make sure that no other configs 
were written:
   ```java
           verifyNoMoreInteractions(configBackingStore);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to