Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-09 Thread via GitHub


C0urante merged PR #6934:
URL: https://github.com/apache/kafka/pull/6934


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-09 Thread via GitHub


ivanyu commented on PR #6934:
URL: https://github.com/apache/kafka/pull/6934#issuecomment-2102553283

   Thank you @C0urante. I updated the description (please tell me if I need to 
add more) there.
   Do you want to me squash the fixups or you squash when merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-09 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1595365475


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -773,6 +773,41 @@ public void 
testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E
 connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, 
"Connector wasn't deleted in time");
 }
 
+@Test
+public void testPatchConnectorConfig() throws Exception {
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+"Initial group of workers did not start in time.");
+
+connect.kafka().createTopic(TOPIC_NAME);
+
+Map props = defaultSinkConnectorProps(TOPIC_NAME);
+props.put("unaffected-key", "unaffected-value");
+props.put("to-be-deleted-key", "value");
+props.put(TASKS_MAX_CONFIG, "1");
+
+Map patch = new HashMap<>();
+patch.put(TASKS_MAX_CONFIG, "2");  // this plays as a value to be 
changed
+patch.put("to-be-added-key", "value");
+patch.put("to-be-deleted-key", null);
+
+connect.configureConnector(CONNECTOR_NAME, props);
+connect.patchConnectorConfig(CONNECTOR_NAME, patch);
+
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 2,
+"connector and tasks did not start in time");
+
+Map expectedConfig = new HashMap<>(props);
+expectedConfig.put("name", CONNECTOR_NAME);
+expectedConfig.put("to-be-added-key", "value");
+expectedConfig.put(TASKS_MAX_CONFIG, "2");
+expectedConfig.remove("to-be-deleted-key");
+assertEquals(expectedConfig, 
connect.connectorInfo(CONNECTOR_NAME).config());

Review Comment:
   Thanks, great suggestion. Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-09 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1595357027


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 return response.entity(createdInfo.result()).build();
 }
 
+@PATCH
+@Path("/{connector}/config")
+public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean 
forward,
+ final Map 
connectorConfigPatch) throws Throwable {
+FutureCallback> cb = new 
FutureCallback<>();
+herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
+Herder.Created createdInfo = 
requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config",
+"PATCH", headers, connectorConfigPatch, new 
TypeReference() { }, new CreatedConnectorInfoTranslator(), 
forward);
+return Response.ok().entity(createdInfo.result()).build();

Review Comment:
   Yes, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1592860284


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -773,6 +773,41 @@ public void 
testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E
 connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, 
"Connector wasn't deleted in time");
 }
 
+@Test
+public void testPatchConnectorConfig() throws Exception {
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+"Initial group of workers did not start in time.");
+
+connect.kafka().createTopic(TOPIC_NAME);
+
+Map props = defaultSinkConnectorProps(TOPIC_NAME);
+props.put("unaffected-key", "unaffected-value");
+props.put("to-be-deleted-key", "value");
+props.put(TASKS_MAX_CONFIG, "1");
+
+Map patch = new HashMap<>();
+patch.put(TASKS_MAX_CONFIG, "2");  // this plays as a value to be 
changed
+patch.put("to-be-added-key", "value");
+patch.put("to-be-deleted-key", null);
+
+connect.configureConnector(CONNECTOR_NAME, props);
+connect.patchConnectorConfig(CONNECTOR_NAME, patch);
+
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 2,
+"connector and tasks did not start in time");
+
+Map expectedConfig = new HashMap<>(props);
+expectedConfig.put("name", CONNECTOR_NAME);
+expectedConfig.put("to-be-added-key", "value");
+expectedConfig.put(TASKS_MAX_CONFIG, "2");
+expectedConfig.remove("to-be-deleted-key");
+assertEquals(expectedConfig, 
connect.connectorInfo(CONNECTOR_NAME).config());

Review Comment:
   I think it's possible for poor timing (which Jenkins is notorious for...) to 
create flakiness here. The connector and both of its tasks may be started, but 
it's possible that the worker we hit with this request won't have read the 
patched connector config from the config topic yet if it's not the leader of 
the cluster.
   
   As a quick hack, we could tweak the order of operations and rely on existing 
retry logic in `ConnectAssertions::assertConnectorAndExactlyNumTasksAreRunning` 
to prevent this:
   
   1. Configure connector with `tasks.max = 2`
   2. Ensure connector is started and 2 tasks are running
   3. Patch connector, including changing `tasks.max` to `3`
   4. Ensure connector is started and 3 tasks are running
   5. Perform the assertion on this line (i.e., that the connector config as 
reported by an arbitrary worker in the cluster matches the expected patch 
config)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on PR #6934:
URL: https://github.com/apache/kafka/pull/6934#issuecomment-2098991457

   Thanks @ivanyu, this is really close. One other thing--can you update the 
description with a brief overview of the PR (probably enough to just mention 
the new endpoint and its behavior), and remove the italicized template?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1592860284


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -773,6 +773,41 @@ public void 
testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E
 connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, 
"Connector wasn't deleted in time");
 }
 
+@Test
+public void testPatchConnectorConfig() throws Exception {
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+"Initial group of workers did not start in time.");
+
+connect.kafka().createTopic(TOPIC_NAME);
+
+Map props = defaultSinkConnectorProps(TOPIC_NAME);
+props.put("unaffected-key", "unaffected-value");
+props.put("to-be-deleted-key", "value");
+props.put(TASKS_MAX_CONFIG, "1");
+
+Map patch = new HashMap<>();
+patch.put(TASKS_MAX_CONFIG, "2");  // this plays as a value to be 
changed
+patch.put("to-be-added-key", "value");
+patch.put("to-be-deleted-key", null);
+
+connect.configureConnector(CONNECTOR_NAME, props);
+connect.patchConnectorConfig(CONNECTOR_NAME, patch);
+
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 2,
+"connector and tasks did not start in time");
+
+Map expectedConfig = new HashMap<>(props);
+expectedConfig.put("name", CONNECTOR_NAME);
+expectedConfig.put("to-be-added-key", "value");
+expectedConfig.put(TASKS_MAX_CONFIG, "2");
+expectedConfig.remove("to-be-deleted-key");
+assertEquals(expectedConfig, 
connect.connectorInfo(CONNECTOR_NAME).config());

Review Comment:
   I think it's possible for poor timing (which Jenkins is notorious for...) to 
create flakiness here. The connector and both of its tasks may be started, but 
it's possible that the worker we hit with this request won't have read the 
patched connector config from the config topic yet.
   
   As a quick hack, we could tweak the order of operations and rely on existing 
retry logic in `ConnectAssertions::assertConnectorAndExactlyNumTasksAreRunning` 
to prevent this:
   
   1. Configure connector with `tasks.max = 2`
   2. Ensure connector is started and 2 tasks are running
   3. Patch connector, and change `tasks.max` to `3`
   4. Ensure connector is started and 3 tasks are running
   5. Perform the assertion on this line (i.e., that the connector config as 
reported by an arbitrary worker in the cluster matches the expected patch 
config)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1592860284


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -773,6 +773,41 @@ public void 
testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E
 connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, 
"Connector wasn't deleted in time");
 }
 
+@Test
+public void testPatchConnectorConfig() throws Exception {
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+"Initial group of workers did not start in time.");
+
+connect.kafka().createTopic(TOPIC_NAME);
+
+Map props = defaultSinkConnectorProps(TOPIC_NAME);
+props.put("unaffected-key", "unaffected-value");
+props.put("to-be-deleted-key", "value");
+props.put(TASKS_MAX_CONFIG, "1");
+
+Map patch = new HashMap<>();
+patch.put(TASKS_MAX_CONFIG, "2");  // this plays as a value to be 
changed
+patch.put("to-be-added-key", "value");
+patch.put("to-be-deleted-key", null);
+
+connect.configureConnector(CONNECTOR_NAME, props);
+connect.patchConnectorConfig(CONNECTOR_NAME, patch);
+
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 2,
+"connector and tasks did not start in time");
+
+Map expectedConfig = new HashMap<>(props);
+expectedConfig.put("name", CONNECTOR_NAME);
+expectedConfig.put("to-be-added-key", "value");
+expectedConfig.put(TASKS_MAX_CONFIG, "2");
+expectedConfig.remove("to-be-deleted-key");
+assertEquals(expectedConfig, 
connect.connectorInfo(CONNECTOR_NAME).config());

Review Comment:
   I think it's possible for poor timing (which Jenkins is notorious for...) to 
create flakiness here. The connector and both of its tasks may be started, but 
it's possible that the worker we hit with this request won't have read the 
patched connector config from the config topic yet if it's not the leader of 
the cluster.
   
   As a quick hack, we could tweak the order of operations and rely on existing 
retry logic in `ConnectAssertions::assertConnectorAndExactlyNumTasksAreRunning` 
to prevent this:
   
   1. Configure connector with `tasks.max = 2`
   2. Ensure connector is started and 2 tasks are running
   3. Patch connector, and change `tasks.max` to `3`
   4. Ensure connector is started and 3 tasks are running
   5. Perform the assertion on this line (i.e., that the connector config as 
reported by an arbitrary worker in the cluster matches the expected patch 
config)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1592847527


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 return response.entity(createdInfo.result()).build();
 }
 
+@PATCH
+@Path("/{connector}/config")
+public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean 
forward,
+ final Map 
connectorConfigPatch) throws Throwable {
+FutureCallback> cb = new 
FutureCallback<>();
+herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
+Herder.Created createdInfo = 
requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config",
+"PATCH", headers, connectorConfigPatch, new 
TypeReference() { }, new CreatedConnectorInfoTranslator(), 
forward);
+return Response.ok().entity(createdInfo.result()).build();

Review Comment:
   Yep, LGTM 
   
   Can you add that to the KIP?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -2336,6 +2336,133 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+when(member.memberId()).thenReturn("leader");
+expectRebalance(0, Collections.emptyList(), Collections.emptyList(), 
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+ClusterConfigState clusterConfigState = new ClusterConfigState(
+0,
+null,
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(clusterConfigState);
+
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+FutureCallback> patchCallback = new 
FutureCallback<>();
+herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+herder.tick();
+assertTrue(patchCallback.isDone());
+ExecutionException exception = assertThrows(ExecutionException.class, 
patchCallback::get);
+assertInstanceOf(NotFoundException.class, exception.getCause());
+}
+
+@Test
+public void testPatchConnectorConfigNotALeader() {
+when(member.memberId()).thenReturn("not-leader");
+
+// The connector is pre-existing due to the mocks.
+ClusterConfigState originalSnapshot = new ClusterConfigState(
+1,
+null,
+Collections.singletonMap(CONN1, 0),
+Collections.singletonMap(CONN1, CONN1_CONFIG),
+Collections.singletonMap(CONN1, TargetState.STARTED),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(originalSnapshot);
+when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+
+// Patch the connector config.
+
+expectMemberEnsureActive();
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
false);
+
+FutureCallback> patchCallback = new 
FutureCallback<>();
+herder.patchConnectorConfig(CONN1, new HashMap<>(), patchCallback);
+herder.tick();
+assertTrue(patchCallback.isDone());
+ExecutionException fencingException = 
assertThrows(ExecutionException.class, patchCallback::get);
+assertInstanceOf(ConnectException.class, fencingException.getCause());
+}
+
+@Test
+public void testPatchConnectorConfig() throws Exception {
+when(member.memberId()).thenReturn("leader");
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+Map originalConnConfig = new HashMap<>(CONN1_CONFIG);
+originalConnConfig.put("foo0", "unaffected");
+originalConnConfig.put("foo1", "will-be-changed");
+originalConnConfig.put("foo2", "will-be-removed");
+
+// The connector is pre-existing due to the mocks.
+
+ClusterConfigState originalSnapshot = new 

Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1592838802


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, 
final Map
 );
 }
 
+@Override
+public void patchConnectorConfig(String connName, Map 
configPatch, Callback> callback) {
+log.trace("Submitting connector config patch request {}", connName);
+addRequest(() -> {

Review Comment:
   Yep, looks great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on PR #6934:
URL: https://github.com/apache/kafka/pull/6934#issuecomment-2092245514

   I also added the integration test as you proposed @C0urante


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1588022076


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, 
final Map
 );
 }
 
+@Override
+public void patchConnectorConfig(String connName, Map 
configPatch, Callback> callback) {
+log.trace("Submitting connector config patch request {}", connName);
+addRequest(() -> {
+ConnectorInfo connectorInfo = connectorInfo(connName);
+if (connectorInfo == null) {
+callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found", null), null);
+} else {
+Map patchedConfig = 
applyConnectorConfigPatch(connectorInfo.config(), configPatch);
+putConnectorConfig(connName, patchedConfig, true, callback);

Review Comment:
   Yes, makes sense, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587989524


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, 
final Map
 );
 }
 
+@Override
+public void patchConnectorConfig(String connName, Map 
configPatch, Callback> callback) {
+log.trace("Submitting connector config patch request {}", connName);
+addRequest(() -> {

Review Comment:
   I added this check + a test for this (does it make sense to have it?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587970461


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(connectorConfigCb);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+Callback> patchCallback = 
mock(Callback.class);
+herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 
patchCallback);
+
+ArgumentCaptor exceptionCaptor = 
ArgumentCaptor.forClass(NotFoundException.class);
+verify(patchCallback).onCompletion(exceptionCaptor.capture(), 
isNull());
+assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + 
CONNECTOR_NAME + " not found");
+assertNull(exceptionCaptor.getValue().getCause());
+}
+
+@Test
+public void testPatchConnectorConfig() throws ExecutionException, 
InterruptedException, TimeoutException {
+// Create the connector.
+Map originalConnConfig = 
connectorConfig(SourceSink.SOURCE);
+originalConnConfig.put("foo1", "bar1");
+originalConnConfig.put("foo2", "bar2");
+
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "changed");
+connConfigPatch.put("foo2", null);
+connConfigPatch.put("foo3", "added");
+
+Map patchedConnConfig = new 
HashMap<>(originalConnConfig);
+patchedConnConfig.put("foo1", "changed");
+patchedConnConfig.remove("foo2");
+patchedConnConfig.put("foo3", "added");
+
+expectAdd(SourceSink.SOURCE);
+Connector connectorMock = mock(SourceConnector.class);
+expectConfigValidation(SourceSink.SOURCE, originalConnConfig, 
patchedConnConfig);
+
+expectConnectorStartingWithoutTasks(originalConnConfig);
+
+herder.putConnectorConfig(CONNECTOR_NAME, originalConnConfig, false, 
createCallback);
+createCallback.get(1000L, TimeUnit.SECONDS);
+
+expectConnectorStartingWithoutTasks(patchedConnConfig);
+
+FutureCallback> patchCallback = new 
FutureCallback<>();
+herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 
patchCallback);
+
+Map returnedConfig = patchCallback.get(1000L, 
TimeUnit.SECONDS).result().config();
+assertEquals(patchedConnConfig, returnedConfig);
+
+// Also check the returned config when requested.
+FutureCallback> configCallback = new 
FutureCallback<>();
+herder.connectorConfig(CONNECTOR_NAME, configCallback);
+
+Map returnedConfig2 = configCallback.get(1000L, 
TimeUnit.SECONDS);
+assertEquals(patchedConnConfig, returnedConfig2);
+}
+
+private void expectConnectorStartingWithoutTasks(Map 
config) {

Review Comment:
   Thanks, did the latter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587966298


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(connectorConfigCb);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+Callback> patchCallback = 
mock(Callback.class);
+herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 
patchCallback);
+
+ArgumentCaptor exceptionCaptor = 
ArgumentCaptor.forClass(NotFoundException.class);
+verify(patchCallback).onCompletion(exceptionCaptor.capture(), 
isNull());
+assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + 
CONNECTOR_NAME + " not found");
+assertNull(exceptionCaptor.getValue().getCause());
+}
+
+@Test
+public void testPatchConnectorConfig() throws ExecutionException, 
InterruptedException, TimeoutException {
+// Create the connector.
+Map originalConnConfig = 
connectorConfig(SourceSink.SOURCE);
+originalConnConfig.put("foo1", "bar1");
+originalConnConfig.put("foo2", "bar2");
+
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "changed");
+connConfigPatch.put("foo2", null);
+connConfigPatch.put("foo3", "added");
+
+Map patchedConnConfig = new 
HashMap<>(originalConnConfig);
+patchedConnConfig.put("foo1", "changed");
+patchedConnConfig.remove("foo2");
+patchedConnConfig.put("foo3", "added");
+
+expectAdd(SourceSink.SOURCE);
+Connector connectorMock = mock(SourceConnector.class);

Review Comment:
   Yeah, a rebase artifact, deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587963241


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(connectorConfigCb);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+Callback> patchCallback = 
mock(Callback.class);
+herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 
patchCallback);
+
+ArgumentCaptor exceptionCaptor = 
ArgumentCaptor.forClass(NotFoundException.class);
+verify(patchCallback).onCompletion(exceptionCaptor.capture(), 
isNull());
+assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + 
CONNECTOR_NAME + " not found");
+assertNull(exceptionCaptor.getValue().getCause());

Review Comment:
   I think, none. Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587958141


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+ClusterConfigState clusterConfigState = new ClusterConfigState(
+0,
+null,
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(clusterConfigState);
+
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+FutureCallback> patchCallback = new 
FutureCallback<>();
+herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+herder.tick();
+assertTrue(patchCallback.isDone());
+ExecutionException exception = assertThrows(ExecutionException.class, 
patchCallback::get);
+assertInstanceOf(NotFoundException.class, exception.getCause());
+}
+
+@Test
+public void testPatchConnectorConfig() throws Exception {
+when(member.memberId()).thenReturn("leader");
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+Map originalConnConfig = new HashMap<>(CONN1_CONFIG);
+originalConnConfig.put("foo1", "bar1");
+originalConnConfig.put("foo2", "bar2");
+
+// The connector is pre-existing due to the mocks.
+
+ClusterConfigState originalSnapshot = new ClusterConfigState(
+1,
+null,
+Collections.singletonMap(CONN1, 0),
+Collections.singletonMap(CONN1, originalConnConfig),
+Collections.singletonMap(CONN1, TargetState.STARTED),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(originalSnapshot);
+when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+
+expectMemberPoll();
+
+// Patch the connector config.
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "changed");
+connConfigPatch.put("foo2", null);
+connConfigPatch.put("foo3", "added");
+
+Map patchedConnConfig = new 
HashMap<>(originalConnConfig);
+patchedConnConfig.put("foo1", "changed");
+patchedConnConfig.remove("foo2");
+patchedConnConfig.put("foo3", "added");
+
+expectMemberEnsureActive();
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+when(worker.getPlugins()).thenReturn(plugins);

Review Comment:
   Thanks, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587955412


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+ClusterConfigState clusterConfigState = new ClusterConfigState(
+0,
+null,
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(clusterConfigState);
+
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+FutureCallback> patchCallback = new 
FutureCallback<>();
+herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+herder.tick();
+assertTrue(patchCallback.isDone());
+ExecutionException exception = assertThrows(ExecutionException.class, 
patchCallback::get);
+assertInstanceOf(NotFoundException.class, exception.getCause());
+}
+
+@Test
+public void testPatchConnectorConfig() throws Exception {
+when(member.memberId()).thenReturn("leader");
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+Map originalConnConfig = new HashMap<>(CONN1_CONFIG);
+originalConnConfig.put("foo1", "bar1");
+originalConnConfig.put("foo2", "bar2");
+
+// The connector is pre-existing due to the mocks.
+
+ClusterConfigState originalSnapshot = new ClusterConfigState(
+1,
+null,
+Collections.singletonMap(CONN1, 0),
+Collections.singletonMap(CONN1, originalConnConfig),
+Collections.singletonMap(CONN1, TargetState.STARTED),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(originalSnapshot);
+when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+
+expectMemberPoll();
+
+// Patch the connector config.
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "changed");
+connConfigPatch.put("foo2", null);
+connConfigPatch.put("foo3", "added");
+
+Map patchedConnConfig = new 
HashMap<>(originalConnConfig);
+patchedConnConfig.put("foo1", "changed");
+patchedConnConfig.remove("foo2");
+patchedConnConfig.put("foo3", "added");
+
+expectMemberEnsureActive();
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+when(worker.getPlugins()).thenReturn(plugins);
+
+ArgumentCaptor> validateCallback = 
ArgumentCaptor.forClass(Callback.class);
+doAnswer(invocation -> {
+validateCallback.getValue().onCompletion(null, CONN1_CONFIG_INFOS);
+return null;
+}).when(herder).validateConnectorConfig(eq(patchedConnConfig), 
validateCallback.capture());
+
+// This is effectively the main check of this test:
+// we validate that what's written in the config storage is the 
patched config.
+doNothing().when(configBackingStore).putConnectorConfig(eq(CONN1), 
eq(patchedConnConfig), isNull());

Review Comment:
   Yes, makes sense, applied



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587938094


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+ClusterConfigState clusterConfigState = new ClusterConfigState(
+0,
+null,
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(clusterConfigState);
+
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+FutureCallback> patchCallback = new 
FutureCallback<>();
+herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+herder.tick();
+assertTrue(patchCallback.isDone());
+ExecutionException exception = assertThrows(ExecutionException.class, 
patchCallback::get);
+assertInstanceOf(NotFoundException.class, exception.getCause());
+}
+
+@Test
+public void testPatchConnectorConfig() throws Exception {
+when(member.memberId()).thenReturn("leader");
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+Map originalConnConfig = new HashMap<>(CONN1_CONFIG);
+originalConnConfig.put("foo1", "bar1");
+originalConnConfig.put("foo2", "bar2");

Review Comment:
   Yes, good idea. Done here and in `StandaloneHerderTest` too.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(connectorConfigCb);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+Callback> patchCallback = 
mock(Callback.class);
+herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 
patchCallback);
+
+ArgumentCaptor exceptionCaptor = 
ArgumentCaptor.forClass(NotFoundException.class);
+verify(patchCallback).onCompletion(exceptionCaptor.capture(), 
isNull());
+assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + 
CONNECTOR_NAME + " not found");
+assertNull(exceptionCaptor.getValue().getCause());
+}
+
+@Test
+public void testPatchConnectorConfig() throws ExecutionException, 
InterruptedException, TimeoutException {
+// Create the connector.
+Map originalConnConfig = 
connectorConfig(SourceSink.SOURCE);
+originalConnConfig.put("foo1", "bar1");
+originalConnConfig.put("foo2", "bar2");

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587929296


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -246,6 +246,31 @@ private synchronized void putConnectorConfig(String 
connName,
 }
 }
 
+@Override
+public void patchConnectorConfig(String connName, Map 
configPatch, Callback> callback) {
+try {
+ConnectorInfo connectorInfo = connectorInfo(connName);
+if (connectorInfo == null) {
+callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found", null), null);
+return;
+}
+
+Map patchedConfig = 
applyConnectorConfigPatch(connectorInfo.config(), configPatch);
+validateConnectorConfig(patchedConfig, (error, configInfos) -> {
+if (error != null) {
+callback.onCompletion(error, null);
+return;
+}
+
+requestExecutorService.submit(
+() -> putConnectorConfig(connName, patchedConfig, 
null, true, callback, configInfos)
+);
+});
+} catch (ConnectException e) {

Review Comment:
   Ack, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587926661


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -246,6 +246,31 @@ private synchronized void putConnectorConfig(String 
connName,
 }
 }
 
+@Override
+public void patchConnectorConfig(String connName, Map 
configPatch, Callback> callback) {

Review Comment:
   Yes, makes sense. Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587918212


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 return response.entity(createdInfo.result()).build();
 }
 
+@PATCH
+@Path("/{connector}/config")
+public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean 
forward,

Review Comment:
   Thanks, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587904583


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 return response.entity(createdInfo.result()).build();
 }
 
+@PATCH
+@Path("/{connector}/config")
+public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean 
forward,
+ final Map 
connectorConfigPatch) throws Throwable {
+FutureCallback> cb = new 
FutureCallback<>();
+herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
+Herder.Created createdInfo = 
requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config",
+"PATCH", headers, connectorConfigPatch, new 
TypeReference() { }, new CreatedConnectorInfoTranslator(), 
forward);
+return Response.ok().entity(createdInfo.result()).build();

Review Comment:
   I didn't find the current `PUT` response documentation, so I propose 
something like this:
   ```
   Responses follow the model of the configuration PUT endpoint:
   1. If the patch was successfully applied, the response code is 200 and the 
body is a JSON object with the updated connector information (`name`, `type`, 
`config`, and `tasks`), for example:
   {
   "name": "my-connector",
   "config":
   {
   "name": "my-connector",
   "sample_config_2": "test_config_2",
   "sample_config": "test_config_new"
   },
   "tasks":
   [
   {
   "connector": "my-connector",
   "task": 0
   },
   {
   "connector": "my-connector",
   "task": 1
   }
   ],
   "type": "sink"
   }
   
   2. In case of errors, the response code matches the error type (e.g. 400 in 
case of a config validation error; 404 if the connector is not found; 500 in 
case of other server-side errors) and the body is a JSON object with the error 
details:
   {
   "error_code": 400,
   "message": "Connector configuration is invalid and contains the 
following 1 error(s):\n...\nYou can also find the above list of errors at the 
endpoint `/{connectorType}/config/validate`"
   }
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-02 Thread via GitHub


ivanyu commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1587717243


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -992,4 +992,15 @@ public List setWorkerLoggerLevel(String namespace, 
String desiredLevelSt
 return loggers.setLevel(namespace, level);
 }
 
+protected Map applyConnectorConfigPatch(Map currentConfig, Map configPatch) {
+Map patchedConfig = new HashMap<>(currentConfig);
+configPatch.forEach((k, v) -> {
+if (v != null) {
+patchedConfig.put(k, v);
+} else {
+patchedConfig.remove(k);
+}
+});
+return patchedConfig;
+}

Review Comment:
   I did this, extracted into `ConnectUtils` (because I think the `null` 
processing logic may be connect-specific) + added a unit test for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-04-17 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1568959278


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 return response.entity(createdInfo.result()).build();
 }
 
+@PATCH
+@Path("/{connector}/config")
+public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean 
forward,
+ final Map 
connectorConfigPatch) throws Throwable {
+FutureCallback> cb = new 
FutureCallback<>();
+herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
+Herder.Created createdInfo = 
requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config",
+"PATCH", headers, connectorConfigPatch, new 
TypeReference() { }, new CreatedConnectorInfoTranslator(), 
forward);
+return Response.ok().entity(createdInfo.result()).build();

Review Comment:
   Just realizing now that we don't actually specify the status and body of the 
REST response in the KIP. I agree with what's here, especially since it matches 
 the existing `PUT /connectors/{name}/config` endpoint, but it's worth 
specifying in the KIP for completeness.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+ClusterConfigState clusterConfigState = new ClusterConfigState(
+0,
+null,
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(clusterConfigState);
+
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+FutureCallback> patchCallback = new 
FutureCallback<>();
+herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+herder.tick();
+assertTrue(patchCallback.isDone());
+ExecutionException exception = assertThrows(ExecutionException.class, 
patchCallback::get);
+assertInstanceOf(NotFoundException.class, exception.getCause());
+}
+
+@Test
+public void testPatchConnectorConfig() throws Exception {
+when(member.memberId()).thenReturn("leader");
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+Map originalConnConfig = new HashMap<>(CONN1_CONFIG);
+originalConnConfig.put("foo1", "bar1");
+originalConnConfig.put("foo2", "bar2");

Review Comment:
   Nit: can we add one more key/value pair that should be unchanged after the 
patch is applied? This would catch bugs where the set of post-patch keys is 
derived from the patch instead of the combination of the patch and the prior 
configuration.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 return response.entity(createdInfo.result()).build();
 }
 
+@PATCH
+@Path("/{connector}/config")
+public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean 
forward,

Review Comment:
   Nit:
   ```suggestion
final @Parameter(hidden = true) 
@QueryParam("forward") Boolean forward,
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(connectorConfigCb);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+Callback> patchCallback = 
mock(Callback.class);
+herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch,