[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-25 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1205535710


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testAlterOffsetsConnectorNotFound() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof NotFoundException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsConnectorNotInStoppedState() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof BadRequestException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsNotLeader() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof NotLeaderException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws 
Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+Map, Map> offsets = new HashMap<>();
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+

[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-25 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1205533269


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testAlterOffsetsConnectorNotFound() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof NotFoundException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsConnectorNotInStoppedState() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof BadRequestException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsNotLeader() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();

Review Comment:
   Huh, TIL!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-24 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1204681794


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testAlterOffsetsConnectorNotFound() throws Exception {

Review Comment:
   Nit: it's a little strange that we're using "not found" in the name here but 
elsewhere in this and the `StandaloneHerderTest` suite we use "unknown"



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffsetsTest.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConnectorOffsetsTest {
+
+@Test
+public void testConnectorOffsetsToMap() {
+// Using arbitrary partition and offset formats here to demonstrate 
that source connector offsets don't
+// follow a standard pattern
+Map partition1 = new HashMap<>();
+partition1.put("partitionKey1", "partitionValue");
+partition1.put("k", 123);
+Map offset1 = new HashMap<>();
+offset1.put("offset", 3.14);

Review Comment:
   Yum!



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/SinkUtilsTest.java:
##
@@ -46,4 +52,124 @@ public void testConsumerGroupOffsetsToConnectorOffsets() {
 expectedPartition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
 assertEquals(expectedPartition, 
connectorOffsets.offsets().get(0).partition());
 }
+
+@Test
+public void testValidateAndParseEmptyPartitionOffsetMap() {
+// expect no exception to be thrown
+Map parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(new HashMap<>());
+assertTrue(parsedOffsets.isEmpty());
+}
+
+@Test
+public void testValidateAndParseInvalidPartition() {
+Map partition = new HashMap<>();
+partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic");
+Map offset = new HashMap<>();
+offset.put(SinkUtils.KAFKA_OFFSET_KEY, 100);
+Map, Map> partitionOffsets = new HashMap<>();
+partitionOffsets.put(partition, offset);
+
+// missing partition key
+ConnectException e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+assertThat(e.getMessage(), containsString("The partition for a sink 
connector offset must contain the keys 'kafka_topic' and 'kafka_partition'"));
+
+partition.put(SinkUtils.KAFKA_PARTITION_KEY, "not a number");
+// bad partition key
+e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+assertThat(e.getMessage(), containsString("Failed to parse the 
following Kafka partition value in the provided offsets: 'not a number'"));
+
+partition.remove(SinkUtils.KAFKA_TOPIC_KEY);
+partition.put(SinkUtils.KAFKA_PARTITION_KEY, "5");
+// missing topic key
+e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+assertThat(e.getMessage(), containsString("The partition for a sink 
connector offset must contain the keys 'kafka_topic' and 'kafka_partition'"));
+}
+
+@Test
+public void testValidateAndParseInvalidOffset() {
+Map partition = new HashMap<>();
+partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic");
+partition.put(SinkUtils.KAFKA_PARTITION_KEY, 10);
+Map offset = new HashMap<>();
+Map, Map> partitionOffsets = new HashMap<>();
+partitionOffsets.put(partition, offset);
+
+// missing offset key
+ConnectException e = assertThrows(ConnectException.class, () -> 
SinkUtils.parseSinkConnectorOffsets(partitionOffsets));
+assertThat(e.getMessage(), containsString("The offset for a sink 
connector should either 

[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-24 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1204612735


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+new SinkConnectorConfig(plugins, connectorConfig),
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Map offsetsToAlter = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() != null)
+.map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue(
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+Set partitionsToReset = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() == null)
+.map(Map.Entry::getKey)
+.collect(Collectors.toSet());
+
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {
+ 

[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-24 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1204612162


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+new SinkConnectorConfig(plugins, connectorConfig),
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Map offsetsToAlter = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() != null)
+.map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue(
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+Set partitionsToReset = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() == null)
+.map(Map.Entry::getKey)
+.collect(Collectors.toSet());
+
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {
+ 

[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-24 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1204604996


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback cb) {
 );
 }
 
+@Override
+public void alterConnectorOffsets(String connName, Map, 
Map> offsets, Callback callback) {
+log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+addRequest(() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);
+if (!alterConnectorOffsetsChecks(connName, callback)) {
+return null;
+}
+// At this point, we should be the leader (the call to 
alterConnectorOffsetsChecks makes sure of that) and can safely run
+// a zombie fencing request
+if (isSourceConnector(connName) && 
config.exactlyOnceSourceEnabled()) {
+log.debug("Performing a round of zombie fencing before 
altering offsets for source connector {} with exactly-once semantics enabled.", 
connName);
+getFenceZombieSourceTasksCallable(connName, (error, ignored) 
-> {
+if (error != null) {
+log.error("Failed to perform zombie fencing for 
exactly-once source connector prior to altering offsets", error);
+callback.onCompletion(new ConnectException("Failed to 
perform zombie fencing for exactly-once source connector prior to altering 
offsets",
+error), null);
+} else {
+log.debug("Successfully completed zombie fencing for 
source connector {}; proceeding to alter offsets.", connName);
+// We need to ensure that we perform the necessary 
checks again inside alterConnectorOffsetsHerderRequest
+// since it is being run in a separate herder request 
and the conditions could have changed since the
+// previous check
+addRequest(getAlterConnectorOffsetsCallable(connName, 
offsets, callback), forwardErrorCallback(callback));
+}
+}).call();
+} else {
+getAlterConnectorOffsetsCallable(connName, offsets, 
callback).call();

Review Comment:
   Nice, LGTM 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202738629


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+new SinkConnectorConfig(plugins, connectorConfig),
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Map offsetsToAlter = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() != null)
+.map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue(
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+Set partitionsToReset = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() == null)
+.map(Map.Entry::getKey)
+.collect(Collectors.toSet());
+
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {
+ 

[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202709620


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+new SinkConnectorConfig(plugins, connectorConfig),
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Map offsetsToAlter = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() != null)
+.map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue(
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+Set partitionsToReset = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() == null)
+.map(Map.Entry::getKey)
+.collect(Collectors.toSet());
+
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {
+ 

[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202694716


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1235,6 +1244,246 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets a mapping from partitions (either source partitions for 
source connectors, or Kafka topic
+ *partitions for sink connectors) to offsets that need to 
be written; may not be null or empty
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+
+if (offsets == null || offsets.isEmpty()) {
+throw new ConnectException("The offsets to be altered may not be 
null or empty");
+}
+
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be written; may not be null or empty
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+sinkConnectorConfig,
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);

Review Comment:
   Can we construct a `List>` here and add elements to that 
as necessary (i.e., depending on whether the sets of to-be-removed and 
to-be-altered partitions are empty), then combine then with `KafkaFuture 
adminFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));`?
   
   The use of a no-op future is a little hard to read.



##

[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202684769


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java:
##
@@ -52,4 +53,82 @@ public static ConnectorOffsets 
consumerGroupOffsetsToConnectorOffsets(Map
+ * {
+ *   "kafka_topic": "topic"
+ *   "kafka_partition": 3
+ * }
+ * 
+ *
+ * and that the provided offsets (values in the {@code partitionOffsets} 
map) look like:
+ * 
+ * {
+ *   "kafka_offset": 1000
+ * }
+ * 
+ *
+ * This method then parses them into a mapping from {@link 
TopicPartition}s to their corresponding {@link Long}
+ * valued offsets.
+ *
+ * @param partitionOffsets the partitions to offset map that needs to be 
validated and parsed.
+ * @return the parsed mapping from {@link TopicPartition} to its 
corresponding {@link Long} valued offset.
+ *
+ * @throws BadRequestException if the provided offsets aren't in the 
expected format
+ */
+public static Map 
validateAndParseSinkConnectorOffsets(Map, Map> 
partitionOffsets) {

Review Comment:
   Well done, looks great 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202678391


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback cb) {
 );
 }
 
+@Override
+public void alterConnectorOffsets(String connName, Map, 
Map> offsets, Callback callback) {
+log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+addRequest(() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);
+if (!alterConnectorOffsetsChecks(connName, callback)) {
+return null;
+}
+// At this point, we should be the leader (the call to 
alterConnectorOffsetsChecks makes sure of that) and can safely run
+// a zombie fencing request
+if (isSourceConnector(connName) && 
config.exactlyOnceSourceEnabled()) {
+log.debug("Performing a round of zombie fencing before 
altering offsets for source connector {} with exactly-once semantics enabled.", 
connName);
+getFenceZombieSourceTasksCallable(connName, (error, ignored) 
-> {
+if (error != null) {
+log.error("Failed to perform zombie fencing for 
exactly-once source connector prior to altering offsets", error);
+callback.onCompletion(new ConnectException("Failed to 
perform zombie fencing for exactly-once source connector prior to altering 
offsets",
+error), null);
+} else {
+log.debug("Successfully completed zombie fencing for 
source connector {}; proceeding to alter offsets.", connName);
+// We need to ensure that we perform the necessary 
checks again inside alterConnectorOffsetsHerderRequest
+// since it is being run in a separate herder request 
and the conditions could have changed since the
+// previous check
+addRequest(getAlterConnectorOffsetsCallable(connName, 
offsets, callback), forwardErrorCallback(callback));
+}
+}).call();
+} else {
+getAlterConnectorOffsetsCallable(connName, offsets, 
callback).call();

Review Comment:
   One convention I've seen used is `foo` (public) and `doFoo` (internal). I'm 
not a huge fan of it but in some cases it can be useful.
   
   Also, are we issuing unnecessary calls to `alterConnectorOffsetsChecks`? We 
do a check in the beginning of `alterConnectorOffsets` and then, if the 
connector is a sink (or exactly-once source support is disabled), we 
immediately invoke the `Callable` returned by 
`getAlterConnectorOffsetsCallable`, which calls `alterConnectorOffsetsChecks` a 
second time.
   
   We might just get rid of `getAlterConnectorOffsetsCallable` altogether and 
inline the logic for it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1202644196


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback cb) {
 );
 }
 
+@Override
+public void alterConnectorOffsets(String connName, Map, 
Map> offsets, Callback callback) {
+log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+addRequest(() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);

Review Comment:
   Good point--yes, we should handle that in `stopConnector` as well (although 
we don't have to do that in this PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-04-13 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1166062323


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -114,6 +127,7 @@ public class Worker {
 
 public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);
 public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
+public static final long ALTER_OFFSETS_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);

Review Comment:
   > users could get successful responses from the alter offsets API but the 
connector could completely ignore the overwritten offsets (if the user resumes 
the connector in the interim).
   
   That's a fair point. I was thinking that with the `STOPPED` state and a 
possible additional guard to prevent cancelled source tasks from committing 
offsets, we would have reasonable protection against zombie tasks from 
overwriting recently-altered offsets. However, I wasn't thinking of the other 
scenario, where the offset alter request is initiated and left ongoing while 
the connector is resumed.
   
   I think the risk of blocking the herder thread that you've brought up is 
perhaps the most convincing argument still in favor of making this operation 
asynchronous. There's risks not just with calls to the `alterOffsets` method, 
but also with reading to the end of offsets topics and contacting the 
transaction coordinator (if altering offsets for an exactly-once source 
connector), to name a few.
   
   If we really want to get fancy, one way we could try to decrease the risks 
of an asynchronous API for non-exactly-once source connectors could be to 
refuse to assign tasks for connectors with ongoing offset alterations during 
rebalance, even if the connector is resumed. The task configs for that 
connector could continue to live in the config topic, we'd just hold off on 
assigning them until the operation succeeds. Of course, this doesn't work if 
the leader of the cluster changes while an offset alter request is being 
serviced, but then (correct me if I'm wrong?) the same risks apply even with a 
synchronous API (although they're probably less likely). We could also try to 
add interruption logic that cancels any in-progress offset alter/reset requests 
when a rebalance starts. Either of these would be fine as a follow-up ticket, 
if they sound reasonable at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-04-13 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1166062323


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -114,6 +127,7 @@ public class Worker {
 
 public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);
 public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
+public static final long ALTER_OFFSETS_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);

Review Comment:
   > users could get successful responses from the alter offsets API but the 
connector could completely ignore the overwritten offsets (if the user resumes 
the connector in the interim).
   
   That's a fair point. I was thinking that with the `STOPPED` state and a 
possible additional guard to prevent cancelled source tasks from committing 
offsets, we would have reasonable protection against zombie tasks from 
overwriting recently-altered offsets. However, I wasn't thinking of the other 
scenario, where the offset alter request is initiated and left ongoing while 
the connector is resumed.
   
   I think the risk of blocking the herder thread that you've brought up is 
perhaps the most convincing argument still in favor of making this operation 
asynchronous. There's risks not just with calls to the `alterOffsets` method, 
but also with reading to the end of offsets topics and contacting the 
transaction coordinator (if altering offsets for an exactly-once source 
connector), to name a few.
   
   I think if we really want to get fancy, one way we could try to decrease the 
risks of an asynchronous API for non-exactly-once source connectors could be to 
refuse to assign tasks for connectors with ongoing offset alterations during 
rebalance, even if the connector is resumed. The task configs for that 
connector could continue to live in the config topic, we'd just hold off on 
assigning them until the operation succeeds. Of course, this doesn't work if 
the leader of the cluster changes while an offset alter request is being 
serviced, but then (correct me if I'm wrong?) the same risks apply even with a 
synchronous API (although they're probably less likely). We could also try to 
add interruption logic that cancels any in-progress offset alter/reset requests 
when a rebalance starts. Either of these would be fine as a follow-up ticket, 
if they sound reasonable at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-04-12 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1164465874


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -114,6 +127,7 @@ public class Worker {
 
 public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);
 public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
+public static final long ALTER_OFFSETS_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);

Review Comment:
   Hmm... I think this may be too short. With sink connectors it's fairly 
straightforward to alter consumer group offsets, but for source connectors we 
have to start and complete a read-to-end of the offsets topic, then write the 
new offsets to it. And in both cases, we have the `alterOffsets` connector 
method to worry about as well.
   
   Can we make the `Worker` API for altering offsets asynchronous, similar to 
what we do for reading offsets?
   
   I know that there's concern about tasks being brought up for the connector 
while the request is being handled, but I think this might be alright.
   
   If the connector is a sink connector, the requests to alter its consumer 
group's offsets will be rejected by the broker if any tasks are active.
   
   If the connector is a source connector and exactly-once support is enabled, 
zombie fencing will take place and we won't be able to complete our write to 
the offsets topic.
   
   Unless I'm mistaken, the only case that's left is non-exactly-once source 
connectors, which IMO it's acceptable for us to ignore since we can't guarantee 
that there aren't zombie tasks running around writing their own offsets anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org