mimaison commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1269268948
##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition
topicPartition) {
static Map wrapPartition(TopicPartition topicPartition,
String sourceClusterAlias) {
Map wrapped = new HashMap<>();
-wrapped.put("topic", topicPartition.topic());
-wrapped.put("partition", topicPartition.partition());
-wrapped.put("cluster", sourceClusterAlias);
+wrapped.put(TOPIC_KEY, topicPartition.topic());
+wrapped.put(PARTITION_KEY, topicPartition.partition());
+wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
return wrapped;
}
-static Map wrapOffset(long offset) {
-return Collections.singletonMap("offset", offset);
+public static Map wrapOffset(long offset) {
+return Collections.singletonMap(OFFSET_KEY, offset);
}
-static TopicPartition unwrapPartition(Map wrapped) {
-String topic = (String) wrapped.get("topic");
-int partition = (Integer) wrapped.get("partition");
+public static TopicPartition unwrapPartition(Map wrapped) {
+String topic = (String) wrapped.get(TOPIC_KEY);
+int partition = (Integer) wrapped.get(PARTITION_KEY);
return new TopicPartition(topic, partition);
}
static Long unwrapOffset(Map wrapped) {
-if (wrapped == null || wrapped.get("offset") == null) {
+if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
return -1L;
}
-return (Long) wrapped.get("offset");
+return (Long) wrapped.get(OFFSET_KEY);
+}
+
+
+/**
+ * Validate a specific key in a source partition that may be written to
the offsets topic for one of the MM2 connectors.
+ * This method ensures that the key is present in the source partition map
and that its value is a string.
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map,
Map)
+ * @see SourceRecord#sourcePartition()
+ *
+ * @param sourcePartition the to-be-validated source partition; may not be
null
+ * @param key the key to check for in the source partition; may be null
Review Comment:
It's not public API, all existing callers pass a non-null value and having
null should cause an NPE below in get(), so maybe we can remove `may be null`?
##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition
topicPartition) {
static Map wrapPartition(TopicPartition topicPartition,
String sourceClusterAlias) {
Map wrapped = new HashMap<>();
-wrapped.put("topic", topicPartition.topic());
-wrapped.put("partition", topicPartition.partition());
-wrapped.put("cluster", sourceClusterAlias);
+wrapped.put(TOPIC_KEY, topicPartition.topic());
+wrapped.put(PARTITION_KEY, topicPartition.partition());
+wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
return wrapped;
}
-static Map wrapOffset(long offset) {
-return Collections.singletonMap("offset", offset);
+public static Map wrapOffset(long offset) {
+return Collections.singletonMap(OFFSET_KEY, offset);
}
-static TopicPartition unwrapPartition(Map wrapped) {
-String topic = (String) wrapped.get("topic");
-int partition = (Integer) wrapped.get("partition");
+public static TopicPartition unwrapPartition(Map wrapped) {
+String topic = (String) wrapped.get(TOPIC_KEY);
+int partition = (Integer) wrapped.get(PARTITION_KEY);
return new TopicPartition(topic, partition);
}
static Long unwrapOffset(Map wrapped) {
-if (wrapped == null || wrapped.get("offset") == null) {
+if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
return -1L;
}
-return (Long) wrapped.get("offset");
+return (Long) wrapped.get(OFFSET_KEY);
+}
+
+
+/**
+ * Validate a specific key in a source partition that may be written to
the offsets topic for one of the MM2 connectors.
+ * This method ensures that the key is present in the source partition map
and that its value is a string.
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map,
Map)
+ * @see SourceRecord#sourcePartition()
+ *
+ * @param sourcePartition the to-be-validated source partition; may not be
null
+ * @param key the key to check for in the source partition; may be null
+ *
+ * @throws ConnectException if the offset is invalid
+ */
+static void validateSourcePartitionString(Map sourcePartition,
String key) {
+Objects.requireNonNull(sourcePartition, "Source partition may not be