[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-25 Thread via GitHub


yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1205023533


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testAlterOffsetsConnectorNotFound() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof NotFoundException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsConnectorNotInStoppedState() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof BadRequestException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsNotLeader() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();

Review Comment:
   Yeah, it does seem like it is intentional - apparently `expectLastCall` 
(both `EasyMock` and `PowerMock`) is optional for void methods and is mainly 
used for clarity. FWIW, I was able to get the test to pass even after removing 
the last one (after the call to `member.poll()`).



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/SinkUtilsTest.java:
##
@@ -46,4 +52,124 @@ public void testConsumerGroupOffsetsToConnectorOffsets() {
 expectedPartition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
 assertEquals(expectedPartition, 
connectorOffsets.offsets().get(0).partition());
 }
+
+@Test
+public void testValidateAndParseEmptyPartitionOffsetMap() {
+// expect no exception to be thrown
+Map parsedOffsets = 
SinkUtils.parseSinkConnectorOffsets(new HashMap<>());
+assertTrue(parsedOffsets.isEmpty());
+}
+
+@Test
+public void testValidateAndParseInvalidPartition() {
+Map partition = new HashMap<>();
+partition.put(SinkUtils.KAFKA_TOPIC_KEY, "topic");
+Map offset = new HashMap<>();
+offset.put(SinkUtils.KAFKA_OFFSET_KEY, 100);
+Map, Map> partitionOffsets = new HashMap<>();
+partitionOffsets.put(partition, offset);
+
+// missing partition key
+ConnectException e = assertThrows(ConnectException.class, () -> 

[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-24 Thread via GitHub


yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1203576413


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback cb) {
 );
 }
 
+@Override
+public void alterConnectorOffsets(String connName, Map, 
Map> offsets, Callback callback) {
+log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+addRequest(() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);
+if (!alterConnectorOffsetsChecks(connName, callback)) {
+return null;
+}
+// At this point, we should be the leader (the call to 
alterConnectorOffsetsChecks makes sure of that) and can safely run
+// a zombie fencing request
+if (isSourceConnector(connName) && 
config.exactlyOnceSourceEnabled()) {
+log.debug("Performing a round of zombie fencing before 
altering offsets for source connector {} with exactly-once semantics enabled.", 
connName);
+getFenceZombieSourceTasksCallable(connName, (error, ignored) 
-> {
+if (error != null) {
+log.error("Failed to perform zombie fencing for 
exactly-once source connector prior to altering offsets", error);
+callback.onCompletion(new ConnectException("Failed to 
perform zombie fencing for exactly-once source connector prior to altering 
offsets",
+error), null);
+} else {
+log.debug("Successfully completed zombie fencing for 
source connector {}; proceeding to alter offsets.", connName);
+// We need to ensure that we perform the necessary 
checks again inside alterConnectorOffsetsHerderRequest
+// since it is being run in a separate herder request 
and the conditions could have changed since the
+// previous check
+addRequest(getAlterConnectorOffsetsCallable(connName, 
offsets, callback), forwardErrorCallback(callback));
+}
+}).call();
+} else {
+getAlterConnectorOffsetsCallable(connName, offsets, 
callback).call();

Review Comment:
   Wow thanks, good catch! I've removed the `getAlterConnectorOffsetsCallable` 
method and refactored the zombie fencing one to be synchronous and named it 
`doFenceZombieSourceTasks` (it definitely sounds a lot better ).



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1235,6 +1244,246 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets a mapping from partitions (either source partitions for 
source connectors, or Kafka topic
+ *partitions for sink connectors) to offsets that need to 
be written; may not be null or empty
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+
+if (offsets == null || offsets.isEmpty()) {
+throw new ConnectException("The offsets to be altered may not be 
null or empty");
+}
+
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need 

[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-23 Thread via GitHub


yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1200530069


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback cb) {
 );
 }
 
+@Override
+public void alterConnectorOffsets(String connName, Map, 
Map> offsets, Callback callback) {
+log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+addRequest(() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);
+if (!alterConnectorOffsetsChecks(connName, callback)) {
+return null;
+}
+// At this point, we should be the leader (the call to 
alterConnectorOffsetsChecks makes sure of that) and can safely run
+// a zombie fencing request
+if (isSourceConnector(connName) && 
config.exactlyOnceSourceEnabled()) {
+log.debug("Performing a round of zombie fencing before 
altering offsets for source connector {} with exactly-once semantics enabled.", 
connName);
+getFenceZombieSourceTasksCallable(connName, (error, ignored) 
-> {
+if (error != null) {
+log.error("Failed to perform zombie fencing for 
exactly-once source connector prior to altering offsets", error);
+callback.onCompletion(new ConnectException("Failed to 
perform zombie fencing for exactly-once source connector prior to altering 
offsets",
+error), null);
+} else {
+log.debug("Successfully completed zombie fencing for 
source connector {}; proceeding to alter offsets.", connName);
+// We need to ensure that we perform the necessary 
checks again inside alterConnectorOffsetsHerderRequest
+// since it is being run in a separate herder request 
and the conditions could have changed since the
+// previous check
+addRequest(getAlterConnectorOffsetsCallable(connName, 
offsets, callback), forwardErrorCallback(callback));
+}
+}).call();
+} else {
+getAlterConnectorOffsetsCallable(connName, offsets, 
callback).call();

Review Comment:
   I think one of the main motivations here (and above) was naming  
   
   We already have `fenceZombieSourceTasks` and `alterConnectorOffsets` 
interface methods but there was a need to break out some of the logic from both 
into separate methods for re-use. I'm happy to take any naming suggestions to 
make them synchronous and wrap them into callables at the relevant call sites.
   
   Maybe something like `fenceZombieSourceTasksSync` / 
`alterConnectorOffsetsSync` or `fenceZombieSourceTasksInternal` / 
`alterConnectorOffsetsInternal`? I don't particularly like either though, 
naming is hard...



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1528,6 +1539,80 @@ public void connectorOffsets(String connName, 
Callback cb) {
 );
 }
 
+@Override
+public void alterConnectorOffsets(String connName, Map, 
Map> offsets, Callback callback) {
+log.trace("Submitting alter offsets request for connector '{}'", 
connName);
+
+addRequest(() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);

Review Comment:
   Thanks, both points make sense, done. Btw shouldn't we also handle the read 
timeout case in the 
[stopConnector](https://github.com/apache/kafka/blob/e96a463561ca8974fca37562b8675ae8ae4aff29/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1107)
 method? The `KafkaConfigBackingStore::putTaskConfigs` method does include a 
read to the end of the config topic at the end but the subsequently called 
`KafkaConfigBackingStore::putTargetState` method doesn't.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 

[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-04-25 Thread via GitHub


yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1167415541


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+new SinkConnectorConfig(plugins, connectorConfig),
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Map offsetsToAlter = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() != null)
+.map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue(
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+Set partitionsToReset = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() == null)
+.map(Map.Entry::getKey)
+.collect(Collectors.toSet());
+
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {

Review Comment:

[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-04-15 Thread via GitHub


yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1167415541


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+new SinkConnectorConfig(plugins, connectorConfig),
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Map offsetsToAlter = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() != null)
+.map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue(
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+Set partitionsToReset = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() == null)
+.map(Map.Entry::getKey)
+.collect(Collectors.toSet());
+
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {

Review Comment:

[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-04-15 Thread via GitHub


yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1167415436


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1257,257 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param connectorConfig the connector's configurations
+ * @param offsets  a mapping from partitions to offsets that need to be 
overwritten
+ * @param cb callback to invoke upon completion
+ */
+public void alterConnectorOffsets(String connName, Map 
connectorConfig,
+  Map, Map> 
offsets, Callback cb) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ *
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+ * @param connectorLoader the connector plugin's classloader to be used as 
the thread context classloader
+ * @param cb callback to invoke upon completion
+ */
+void alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+   Map, Map> 
offsets, ClassLoader connectorLoader, Callback cb) {
+executor.submit(plugins.withClassLoader(connectorLoader, () -> {
+try {
+Map parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for 
connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+
+Class sinkConnectorClass = 
connector.getClass();
+Map adminConfig = adminConfigs(
+connName,
+"connector-worker-adminclient-" + connName,
+config,
+new SinkConnectorConfig(plugins, connectorConfig),
+sinkConnectorClass,
+connectorClientConfigOverridePolicy,
+kafkaClusterId,
+ConnectorType.SINK);
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, connectorConfig);
+String groupId = (String) baseConsumerConfigs(
+connName, "connector-consumer-", config, 
sinkConnectorConfig,
+sinkConnectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+
+Map offsetsToAlter = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() != null)
+.map(entry -> new 
AbstractMap.SimpleEntry<>(entry.getKey(), new 
OffsetAndMetadata(entry.getValue(
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+Set partitionsToReset = 
parsedOffsets.entrySet()
+.stream()
+.filter(entry -> entry.getValue() == null)
+.map(Map.Entry::getKey)
+.collect(Collectors.toSet());
+
+KafkaFuture adminFuture = 
KafkaFuture.completedFuture(null);
+
+Admin admin = adminFactory.apply(adminConfig);
+
+try {

Review Comment:

[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-04-14 Thread via GitHub


yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1167384274


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -114,6 +127,7 @@ public class Worker {
 
 public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);
 public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
+public static final long ALTER_OFFSETS_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);

Review Comment:
   Thanks for the detailed response and it sounds like we're on the same page 
now. I've refactored the alter offsets worker API to be asynchronous.
   
   > possible additional guard to prevent cancelled source tasks from 
committing offsets
   
   I guess we don't really need to worry too much about cancelled source tasks 
since during regular task stop, we also remove the periodic offset commit task 
in the `SourceTaskOffsetCommitter`?
   
   > one way we could try to decrease the risks of an asynchronous API for 
non-exactly-once source connectors could be to refuse to assign tasks for 
connectors with ongoing offset alterations during rebalance, even if the 
connector is resumed
   
   That's an interesting idea but it does seem to be a pretty invasive change 
w.r.t the current rebalancing logic which is agnostic to all on-going 
operations in the workers. The limitation is also a valid one and yeah the same 
risks apply even with the sync API although I'm not sure I follow why you think 
it's less likely? Isn't it more likely that a synchronous alter offsets request 
hangs and causes the leader to fall out of the group leading to a new leader 
being elected?
   
   > We could also try to add interruption logic that cancels any in-progress 
offset alter/reset requests when a rebalance starts
   
   We would need to be careful about the exact points where we allow 
interruptions. For instance, we wouldn't want to abandon a request midway 
through writing offsets (in the non-EoS source connector case where it isn't an 
atomic operation, or for consumer groups when we're altering offsets for some 
partitions + resetting offsets for some others). Although, this does seem like 
a more appealing option overall and I've filed this Jira as a potential follow 
up item - https://issues.apache.org/jira/browse/KAFKA-14910



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-04-13 Thread via GitHub


yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1165441821


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -114,6 +127,7 @@ public class Worker {
 
 public static final long CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);
 public static final long EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
+public static final long ALTER_OFFSETS_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(5);

Review Comment:
   Hm but for non-exactly-once source connectors (which is the default mode), 
this would leave the door open to confusing behavior where users could get 
successful responses from the alter offsets API but the connector could 
completely ignore the overwritten offsets (if the user resumes the connector in 
the interim). I agree that the zombie task case is unhandled for non EoS source 
connectors, but at least that would only typically occur for misbehaving 
connector plugins whereas making the alter offsets API async would allow users 
to shoot themselves in the foot. I don't disagree that the 5 second timeout is 
quite non-ideal and even more concerning is the fact that if a connector's 
`alterOffsets` method hangs, it can disable a worker (something that was a big 
problem with other connector methods until your elegant fix in 
https://github.com/apache/kafka/pull/8069). I'm just trying to weigh the pros 
and cons here but it does seem like doing alter offset operations 
asynchronously in the
  worker has more benefits than drawbacks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-04-11 Thread via GitHub


yashmayya commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1163225504


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -19,24 +19,25 @@
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;

Review Comment:
   Yeah, I did want to avoid doing this, but it was essentially a last resort. 
The `HttpURLConnection` API doesn't support the `PATCH` HTTP method and there 
is a won't fix bug ticket in OpenJDK for this - 
https://bugs.openjdk.org/browse/JDK-7016595. The new `HttpClient` (incubated in 
Java 9 and made official in Java 11) which supersedes `HttpURLConnection` does 
fix this issue, but since we still support Java 8, that wasn't an option 
either. Since the Jetty HTTP client is already being used by the main Connect 
codebase, I figured it'd be the best option here.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1247,6 +1254,224 @@ void sourceConnectorOffsets(String connName, 
ConnectorOffsetBackingStore offsetS
 });
 }
 
+/**
+ * Alter a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be 
altered
+ * @param offsets a mapping from partitions to offsets that need to be 
overwritten
+ * @param connectorConfig the connector's configurations
+ *
+ * @return true if the connector plugin has implemented {@link 
org.apache.kafka.connect.sink.SinkConnector#alterOffsets(Map, Map)}
+ * / {@link 
org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map)} and it 
returns true for the provided offsets,
+ * false otherwise
+ *
+ */
+public boolean alterConnectorOffsets(String connName, Map, 
Map> offsets,
+ Map connectorConfig) {
+String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+Connector connector;
+
+try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+connector = plugins.newConnector(connectorClassOrAlias);
+if (ConnectUtils.isSinkConnector(connector)) {
+log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
+return alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets);
+} else {
+log.debug("Altering offsets for source connector: {}", 
connName);
+return alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets);
+}
+}
+}
+
+/**
+ * Alter a sink connector's consumer group offsets.
+ * 
+ * Visible for testing.
+ * @param connName the name of the sink connector whose offsets are to be 
altered
+ * @param connector an instance of the sink connector
+ * @param connectorConfig the sink connector's configuration
+ * @param offsets a mapping from topic partitions to offsets that need to 
be overwritten
+ * @return true if the sink connector has implemented {@link 
org.apache.kafka.connect.sink.SinkConnector#alterOffsets(Map, Map)}
+ * and it returns true for the provided offsets, false otherwise
+ */
+boolean alterSinkConnectorOffsets(String connName, Connector connector, 
Map connectorConfig,
+  Map, Map> 
offsets) {
+Map parsedOffsets = 
SinkUtils.validateAndParseSinkConnectorOffsets(offsets);
+Timer timer = time.timer(ALTER_OFFSETS_TIMEOUT_MS);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, parsedOffsets);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to alter offsets for connector 
" + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
+