yashmayya commented on code in PR #13818:
URL: https://github.com/apache/kafka/pull/13818#discussion_r1222869378


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1268,39 +1270,55 @@ public void alterConnectorOffsets(String connName, 
Map<String, String> connector
             connector = plugins.newConnector(connectorClassOrAlias);
             if (ConnectUtils.isSinkConnector(connector)) {
                 log.debug("Altering consumer group offsets for sink connector: 
{}", connName);
-                alterSinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+                modifySinkConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
             } else {
                 log.debug("Altering offsets for source connector: {}", 
connName);
-                alterSourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+                modifySourceConnectorOffsets(connName, connector, 
connectorConfig, offsets, connectorLoader, cb);
+            }
+        }
+    }
+
+    /**
+     * Reset a connector's offsets.
+     *
+     * @param connName the name of the connector whose offsets are to be reset
+     * @param connectorConfig the connector's configurations
+     * @param cb callback to invoke upon completion
+     */
+    public void resetConnectorOffsets(String connName, Map<String, String> 
connectorConfig, Callback<Message> cb) {

Review Comment:
   > If we're worried about accidentally introducing a nasty bug where an 
empty-bodied alter request causes an unintentional reset, we can add an 
integration test for that case.
   
   Yeah, this was exactly my worry and the reason why I'd kept them separated. 
Based on your feedback, I've added a new integration test and also moved the 
second level check to `AbstractHerder::alterConnectorOffsets` (so that we can 
consolidate the two methods in the `Worker`). While we could in theory do a 
similar consolidation for the `Herder` methods, I think it's probably a better 
idea to have cleaner and more well-defined interface methods there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to