[GitHub] [kafka] mimaison commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-25 Thread via GitHub


mimaison commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1269270009


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java:
##
@@ -85,6 +89,26 @@ public String version() {
 return AppInfoParser.getVersion();
 }
 
+@Override
+public boolean alterOffsets(Map config, Map, Map> offsets) {

Review Comment:
   Yeah I've not looked at the background but I agree with Yash, it looks 
strange



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-20 Thread via GitHub


mimaison commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1269268948


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition 
topicPartition) {
 
 static Map wrapPartition(TopicPartition topicPartition, 
String sourceClusterAlias) {
 Map wrapped = new HashMap<>();
-wrapped.put("topic", topicPartition.topic());
-wrapped.put("partition", topicPartition.partition());
-wrapped.put("cluster", sourceClusterAlias);
+wrapped.put(TOPIC_KEY, topicPartition.topic());
+wrapped.put(PARTITION_KEY, topicPartition.partition());
+wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
 return wrapped;
 }
 
-static Map wrapOffset(long offset) {
-return Collections.singletonMap("offset", offset);
+public static Map wrapOffset(long offset) {
+return Collections.singletonMap(OFFSET_KEY, offset);
 }
 
-static TopicPartition unwrapPartition(Map wrapped) {
-String topic = (String) wrapped.get("topic");
-int partition = (Integer) wrapped.get("partition");
+public static TopicPartition unwrapPartition(Map wrapped) {
+String topic = (String) wrapped.get(TOPIC_KEY);
+int partition = (Integer) wrapped.get(PARTITION_KEY);
 return new TopicPartition(topic, partition);
 }
 
 static Long unwrapOffset(Map wrapped) {
-if (wrapped == null || wrapped.get("offset") == null) {
+if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
 return -1L;
 }
-return (Long) wrapped.get("offset");
+return (Long) wrapped.get(OFFSET_KEY);
+}
+
+
+/**
+ * Validate a specific key in a source partition that may be written to 
the offsets topic for one of the MM2 connectors.
+ * This method ensures that the key is present in the source partition map 
and that its value is a string.
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+ * @see SourceRecord#sourcePartition()
+ *
+ * @param sourcePartition the to-be-validated source partition; may not be 
null
+ * @param key the key to check for in the source partition; may be null

Review Comment:
   It's not public API, all existing callers pass a non-null value and having 
null should cause an NPE below in get(), so maybe we can remove `may be null`?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition 
topicPartition) {
 
 static Map wrapPartition(TopicPartition topicPartition, 
String sourceClusterAlias) {
 Map wrapped = new HashMap<>();
-wrapped.put("topic", topicPartition.topic());
-wrapped.put("partition", topicPartition.partition());
-wrapped.put("cluster", sourceClusterAlias);
+wrapped.put(TOPIC_KEY, topicPartition.topic());
+wrapped.put(PARTITION_KEY, topicPartition.partition());
+wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
 return wrapped;
 }
 
-static Map wrapOffset(long offset) {
-return Collections.singletonMap("offset", offset);
+public static Map wrapOffset(long offset) {
+return Collections.singletonMap(OFFSET_KEY, offset);
 }
 
-static TopicPartition unwrapPartition(Map wrapped) {
-String topic = (String) wrapped.get("topic");
-int partition = (Integer) wrapped.get("partition");
+public static TopicPartition unwrapPartition(Map wrapped) {
+String topic = (String) wrapped.get(TOPIC_KEY);
+int partition = (Integer) wrapped.get(PARTITION_KEY);
 return new TopicPartition(topic, partition);
 }
 
 static Long unwrapOffset(Map wrapped) {
-if (wrapped == null || wrapped.get("offset") == null) {
+if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
 return -1L;
 }
-return (Long) wrapped.get("offset");
+return (Long) wrapped.get(OFFSET_KEY);
+}
+
+
+/**
+ * Validate a specific key in a source partition that may be written to 
the offsets topic for one of the MM2 connectors.
+ * This method ensures that the key is present in the source partition map 
and that its value is a string.
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+ * @see SourceRecord#sourcePartition()
+ *
+ * @param sourcePartition the to-be-validated source partition; may not be 
null
+ * @param key the key to check for in the source partition; may be null
+ *
+ * @throws ConnectException if the offset is invalid
+ */
+static void validateSourcePartitionString(Map sourcePartition, 
String key) {
+Objects.requireNonNull(sourcePartition, "Source partition may not be