[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-22 Thread via GitHub


yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1238244318


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1317,192 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new ListConsumerGroupOffsetsOptions()
+.timeoutMs((int) timer.remainingMs());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+.get(timer.remainingMs(), 
TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+timer.update();
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting offsets for sink connector {}", connName, e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting offsets for sink connector " + connName, e), 
null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
-
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+log.error("Failed to modify offsets for connector {} 
because it doesn't support external modification of offsets",
+connName, e);
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
 }
+

[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-10 Thread via GitHub


yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1224138055


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
 
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+// This should only occur for an offset reset request when:
+// 1. There was a prior attempt to reset offsets
+// OR
+// 2. No offsets have been committed yet
+if (offsetsToWrite.isEmpty()) {

Review Comment:
   I've made this change but it also 

[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-09 Thread via GitHub


yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1224138055


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
 
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+// This should only occur for an offset reset request when:
+// 1. There was a prior attempt to reset offsets
+// OR
+// 2. No offsets have been committed yet
+if (offsetsToWrite.isEmpty()) {

Review Comment:
   I've made this change but it also 

[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-08 Thread via GitHub


yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1222869378


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1268,39 +1270,55 @@ public void alterConnectorOffsets(String connName, 
Map connector
 connector = plugins.newConnector(connectorClassOrAlias);
 if (ConnectUtils.isSinkConnector(connector)) {
 log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
-alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+modifySinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
 } else {
 log.debug("Altering offsets for source connector: {}", 
connName);
-alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+modifySourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+}
+}
+}
+
+/**
+ * Reset a connector's offsets.
+ *
+ * @param connName the name of the connector whose offsets are to be reset
+ * @param connectorConfig the connector's configurations
+ * @param cb callback to invoke upon completion
+ */
+public void resetConnectorOffsets(String connName, Map 
connectorConfig, Callback cb) {

Review Comment:
   > If we're worried about accidentally introducing a nasty bug where an 
empty-bodied alter request causes an unintentional reset, we can add an 
integration test for that case.
   
   Yeah, this was exactly my worry and the reason why I'd kept them separated. 
Based on your feedback, I've added a new integration test and also moved the 
second level check to `AbstractHerder::alterConnectorOffsets` (so that we can 
consolidate the two methods in the `Worker`). While we could in theory do a 
similar consolidation for the `Herder` methods, I think it's probably a better 
idea to have cleaner and more well-defined interface methods there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-07 Thread via GitHub


yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1221937078


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1587,10 +1595,11 @@ private boolean alterConnectorOffsetsChecks(String 
connName, Callback c
 // If the target state for the connector is stopped, its task count is 
0, and there is no rebalance pending (checked above),
 // we can be sure that the tasks have at least been attempted to be 
stopped (or cancelled if they took too long to stop).
 // Zombie tasks are handled by a round of zombie fencing for exactly 
once source connectors. Zombie sink tasks are handled
-// naturally because requests to alter consumer group offsets will 
fail if there are still active members in the group.
+// naturally because requests to alter consumer group offsets / delete 
consumer groups will fail if there are still active members
+// in the group.
 if (configState.targetState(connName) != TargetState.STOPPED || 
configState.taskCount(connName) != 0) {
-callback.onCompletion(new BadRequestException("Connectors must be 
in the STOPPED state before their offsets can be altered. This " +
-"can be done for the specified connector by issuing a PUT 
request to the /connectors/" + connName + "/stop endpoint"), null);
+callback.onCompletion(new BadRequestException("Connectors must be 
in the STOPPED state before their offsets can be modified externally. " +

Review Comment:
   I think just "modified" should be clear enough to users since this is 
synchronously surfaced to the user via the alter / reset offsets REST API 
response  



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-  

[GitHub] [kafka] yashmayya commented on a diff in pull request #13818: KAFKA-14784: Connect offset reset REST API

2023-06-06 Thread via GitHub


yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1219524567


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1320,89 +1338,188 @@ void alterSinkConnectorOffsets(String connName, 
Connector connector, Map> adminFutures = new ArrayList<>();
-
-Map offsetsToAlter = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() != null)
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
new OffsetAndMetadata(e.getValue(;
-
-if (!offsetsToAlter.isEmpty()) {
-log.debug("Committing the following consumer group 
offsets using an admin client for sink connector {}: {}.",
-connName, offsetsToAlter);
-AlterConsumerGroupOffsetsOptions 
alterConsumerGroupOffsetsOptions = new 
AlterConsumerGroupOffsetsOptions().timeoutMs(
+Map offsetsToWrite;
+if (isReset) {
+offsetsToWrite = new HashMap<>();
+ListConsumerGroupOffsetsOptions 
listConsumerGroupOffsetsOptions = new 
ListConsumerGroupOffsetsOptions().timeoutMs(
 (int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-AlterConsumerGroupOffsetsResult 
alterConsumerGroupOffsetsResult = admin.alterConsumerGroupOffsets(groupId, 
offsetsToAlter,
-alterConsumerGroupOffsetsOptions);
-
-
adminFutures.add(alterConsumerGroupOffsetsResult.all());
+try {
+admin.listConsumerGroupOffsets(groupId, 
listConsumerGroupOffsetsOptions)
+.partitionsToOffsetAndMetadata()
+
.get(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+.forEach((topicPartition, 
offsetAndMetadata) -> offsetsToWrite.put(topicPartition, null));
+
+log.debug("Found the following topic partitions 
(to reset offsets) for sink connector {} and consumer group ID {}: {}",
+connName, groupId, 
offsetsToWrite.keySet());
+} catch (Exception e) {
+Utils.closeQuietly(admin, "Offset reset admin for 
sink connector " + connName);
+log.error("Failed to list offsets prior to 
resetting sink connector offsets", e);
+cb.onCompletion(new ConnectException("Failed to 
list offsets prior to resetting sink connector offsets", e), null);
+return;
+}
+} else {
+offsetsToWrite = 
SinkUtils.parseSinkConnectorOffsets(offsets);
 }
 
-Set partitionsToReset = 
parsedOffsets.entrySet()
-.stream()
-.filter(entry -> entry.getValue() == null)
-.map(Map.Entry::getKey)
-.collect(Collectors.toSet());
-
-if (!partitionsToReset.isEmpty()) {
-log.debug("Deleting the consumer group offsets for the 
following topic partitions using an admin client for sink connector {}: {}.",
-connName, partitionsToReset);
-DeleteConsumerGroupOffsetsOptions 
deleteConsumerGroupOffsetsOptions = new 
DeleteConsumerGroupOffsetsOptions().timeoutMs(
-(int) 
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
-DeleteConsumerGroupOffsetsResult 
deleteConsumerGroupOffsetsResult = admin.deleteConsumerGroupOffsets(groupId, 
partitionsToReset,
-deleteConsumerGroupOffsetsOptions);
+boolean alterOffsetsResult;
+try {
+alterOffsetsResult = ((SinkConnector) 
connector).alterOffsets(connectorConfig, offsetsToWrite);
+} catch (UnsupportedOperationException e) {
+throw new ConnectException("Failed to modify offsets 
for connector " + connName + " because it doesn't support external " +
+"modification of offsets", e);
+}
 
-
adminFutures.add(deleteConsumerGroupOffsetsResult.all());
+// This should only occur for an offset reset request when:
+// 1. There was a prior attempt to reset offsets
+// OR
+// 2. No offsets have been committed yet
+if (offsetsToWrite.isEmpty()) {

Review Comment:
   I'm wondering whether we should go