This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fdeabe552f1 KAFKA-20229: Batch offset translation in
RemoteClusterUtils (#21591)
fdeabe552f1 is described below
commit fdeabe552f19cdc4d52d824a2d4594828cd5cf63
Author: Mickael Maison <[email protected]>
AuthorDate: Fri Feb 27 12:20:43 2026 +0100
KAFKA-20229: Batch offset translation in RemoteClusterUtils (#21591)
Implements [KIP-1239](https://cwiki.apache.org/confluence/x/h4HMFw)
Reviewers: Viktor Somogyi-Vass <[email protected]>
---
.../apache/kafka/connect/mirror/MirrorClient.java | 55 ++++++++---
.../kafka/connect/mirror/RemoteClusterUtils.java | 17 ++++
.../kafka/connect/mirror/MirrorClientTest.java | 103 +++++++++++++++++++++
3 files changed, 162 insertions(+), 13 deletions(-)
diff --git
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
index 06dec5b25ba..7275ce72020 100644
---
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
+++
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
@@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@@ -70,6 +71,11 @@ public class MirrorClient implements AutoCloseable {
this.consumerConfig = consumerConfig;
}
+ // for testing
+ Consumer<byte[], byte[]> consumer() {
+ return new KafkaConsumer<>(consumerConfig, new
ByteArrayDeserializer(), new ByteArrayDeserializer());
+ }
+
/**
* Closes internal clients.
*/
@@ -147,24 +153,32 @@ public class MirrorClient implements AutoCloseable {
}
/**
- * Translates a remote consumer group's offsets into corresponding local
offsets. Topics are automatically
+ * Translates remote consumer groups' offsets into corresponding local
offsets. Topics are automatically
* renamed according to the ReplicationPolicy.
- * @param consumerGroupId The group ID of remote consumer group
+ * @param consumerGroupPattern The regex pattern specifying the consumer
groups to translate offsets for
* @param remoteClusterAlias The alias of remote cluster
* @param timeout The maximum time to block when consuming from the
checkpoints topic
+ * @throws IllegalArgumentException If any of the arguments are null
*/
- public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String
consumerGroupId,
- String remoteClusterAlias, Duration timeout) {
+ public Map<String, Map<TopicPartition, OffsetAndMetadata>>
remoteConsumerOffsets(Pattern consumerGroupPattern,
+ String remoteClusterAlias, Duration timeout) {
+ if (consumerGroupPattern == null) {
+ throw new IllegalArgumentException("`consumerGroupPattern` must
not be null");
+ }
+ if (remoteClusterAlias == null) {
+ throw new IllegalArgumentException("`remoteClusterAlias` must not
be null");
+ }
+ if (timeout == null) {
+ throw new IllegalArgumentException("`timeout` must not be null");
+ }
long deadline = System.currentTimeMillis() + timeout.toMillis();
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = new
HashMap<>();
- try (KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerConfig,
- new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+ try (Consumer<byte[], byte[]> consumer = consumer()) {
// checkpoint topics are not "remote topics", as they are not
replicated. So we don't need
// to use ReplicationPolicy to create the checkpoint topic here.
String checkpointTopic =
replicationPolicy.checkpointsTopic(remoteClusterAlias);
- List<TopicPartition> checkpointAssignment =
- List.of(new TopicPartition(checkpointTopic, 0));
+ List<TopicPartition> checkpointAssignment = List.of(new
TopicPartition(checkpointTopic, 0));
consumer.assign(checkpointAssignment);
consumer.seekToBeginning(checkpointAssignment);
while (System.currentTimeMillis() < deadline &&
!endOfStream(consumer, checkpointAssignment)) {
@@ -172,20 +186,35 @@ public class MirrorClient implements AutoCloseable {
for (ConsumerRecord<byte[], byte[]> record : records) {
try {
Checkpoint checkpoint =
Checkpoint.deserializeRecord(record);
- if
(checkpoint.consumerGroupId().equals(consumerGroupId)) {
- offsets.put(checkpoint.topicPartition(),
checkpoint.offsetAndMetadata());
+ String consumerGroupId = checkpoint.consumerGroupId();
+ if
(consumerGroupPattern.matcher(consumerGroupId).matches()) {
+ offsets.computeIfAbsent(consumerGroupId, k -> new
HashMap<>())
+ .put(checkpoint.topicPartition(),
checkpoint.offsetAndMetadata());
}
} catch (SchemaException e) {
log.info("Could not deserialize record. Skipping.", e);
}
}
}
- log.info("Consumed {} checkpoint records for {} from {}.",
offsets.size(),
- consumerGroupId, checkpointTopic);
+ log.info("Consumed {} checkpoint records from {}.",
offsets.size(), checkpointTopic);
}
return offsets;
}
+ /**
+ * Translates a remote consumer group's offsets into corresponding local
offsets. Topics are automatically
+ * renamed according to the ReplicationPolicy.
+ * @param consumerGroupId The group ID of remote consumer group
+ * @param remoteClusterAlias The alias of remote cluster
+ * @param timeout The maximum time to block when consuming from the
checkpoints topic
+ */
+ public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String
consumerGroupId,
+ String remoteClusterAlias, Duration timeout) {
+ Pattern consumerGroupPattern =
Pattern.compile(Pattern.quote(consumerGroupId));
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets =
remoteConsumerOffsets(consumerGroupPattern, remoteClusterAlias, timeout);
+ return offsets.getOrDefault(consumerGroupId, new HashMap<>());
+ }
+
Set<String> listTopics() throws InterruptedException {
try {
return adminClient.listTopics().names().get();
diff --git
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
index a3eb778b7ee..470d027dff5 100644
---
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
+++
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
@@ -26,6 +26,7 @@ import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
/**
@@ -100,4 +101,20 @@ public final class RemoteClusterUtils {
return client.remoteConsumerOffsets(consumerGroupId,
remoteClusterAlias, timeout);
}
}
+
+ /**
+ * Translates remote consumer groups' offsets into corresponding local
offsets. Topics are automatically
+ * renamed according to the configured {@link ReplicationPolicy}.
+ * @param properties Map of properties to instantiate a {@link
MirrorClient}
+ * @param remoteClusterAlias The alias of the remote cluster
+ * @param consumerGroupPattern The regex pattern specifying the consumer
groups to translate offsets for
+ * @param timeout The maximum time to block when consuming from the
checkpoints topic
+ * @throws IllegalArgumentException If any of the arguments are null
+ */
+ public static Map<String, Map<TopicPartition, OffsetAndMetadata>>
translateOffsets(Map<String, Object> properties,
+ String remoteClusterAlias, Pattern consumerGroupPattern, Duration
timeout) {
+ try (MirrorClient client = new MirrorClient(properties)) {
+ return client.remoteConsumerOffsets(consumerGroupPattern,
remoteClusterAlias, timeout);
+ }
+ }
}
diff --git
a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
index 5e99f6cd74e..f7b6bbb08c3 100644
---
a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
+++
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
@@ -16,24 +16,36 @@
*/
package org.apache.kafka.connect.mirror;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MirrorClientTest {
+ private static final String SOURCE = "source";
+
private static class FakeMirrorClient extends MirrorClient {
List<String> topics;
+ public MockConsumer<byte[], byte[]> consumer;
FakeMirrorClient(List<String> topics) {
this(new DefaultReplicationPolicy(), topics);
@@ -52,6 +64,15 @@ public class MirrorClientTest {
protected Set<String> listTopics() {
return new HashSet<>(topics);
}
+
+ @Override
+ Consumer<byte[], byte[]> consumer() {
+ if (consumer == null) {
+ return super.consumer();
+ } else {
+ return consumer;
+ }
+ }
}
@Test
@@ -208,9 +229,91 @@ public class MirrorClientTest {
.topicSource("backup.heartbeats"));
}
+ @Test
+ public void testRemoteConsumerOffsetsIllegalArgs() {
+ FakeMirrorClient client = new FakeMirrorClient();
+ assertThrows(IllegalArgumentException.class, () ->
client.remoteConsumerOffsets((Pattern) null, "", Duration.ofSeconds(1L)));
+ assertThrows(IllegalArgumentException.class, () ->
client.remoteConsumerOffsets(Pattern.compile(""), null,
Duration.ofSeconds(1L)));
+ assertThrows(IllegalArgumentException.class, () ->
client.remoteConsumerOffsets(Pattern.compile(""), "", null));
+ }
+
+ @Test
+ public void testRemoteConsumerOffsets() {
+ String grp0 = "mygroup0";
+ String grp1 = "mygroup1";
+ FakeMirrorClient client = new FakeMirrorClient();
+ String checkpointTopic =
client.replicationPolicy().checkpointsTopic(SOURCE);
+ TopicPartition checkpointTp = new TopicPartition(checkpointTopic, 0);
+
+ TopicPartition t0p0 = new TopicPartition("topic0", 0);
+ TopicPartition t0p1 = new TopicPartition("topic0", 1);
+
+ Checkpoint cp0 = new Checkpoint(grp0, t0p0, 1L, 1L, "cp0");
+ Checkpoint cp1 = new Checkpoint(grp0, t0p0, 2L, 2L, "cp1");
+ Checkpoint cp2 = new Checkpoint(grp0, t0p1, 3L, 3L, "cp2");
+ Checkpoint cp3 = new Checkpoint(grp1, t0p1, 4L, 4L, "cp3");
+
+ // Batch translation matches only mygroup0
+ client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets =
client.remoteConsumerOffsets(
+ Pattern.compile(grp0), SOURCE, Duration.ofSeconds(10L));
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedOffsets =
Map.of(
+ grp0, Map.of(
+ t0p0, cp1.offsetAndMetadata(),
+ t0p1, cp2.offsetAndMetadata()
+ )
+ );
+ assertEquals(expectedOffsets, offsets);
+
+ // Batch translation matches all groups
+ client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
+ offsets = client.remoteConsumerOffsets(Pattern.compile(".*"), SOURCE,
Duration.ofSeconds(10L));
+ expectedOffsets = Map.of(
+ grp0, Map.of(
+ t0p0, cp1.offsetAndMetadata(),
+ t0p1, cp2.offsetAndMetadata()
+ ),
+ grp1, Map.of(
+ t0p1, cp3.offsetAndMetadata()
+ )
+ );
+ assertEquals(expectedOffsets, offsets);
+
+ // Batch translation matches nothing
+ client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
+ offsets =
client.remoteConsumerOffsets(Pattern.compile("unknown-group"), SOURCE,
Duration.ofSeconds(10L));
+ assertTrue(offsets.isEmpty());
+
+ // Translation for mygroup0
+ client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
+ Map<TopicPartition, OffsetAndMetadata> offsets2 =
client.remoteConsumerOffsets(grp0, SOURCE, Duration.ofSeconds(10L));
+ Map<TopicPartition, OffsetAndMetadata> expectedOffsets2 = Map.of(
+ t0p0, cp1.offsetAndMetadata(),
+ t0p1, cp2.offsetAndMetadata()
+ );
+ assertEquals(expectedOffsets2, offsets2);
+
+ // Translation for unknown group
+ client.consumer = buildConsumer(checkpointTp, cp0, cp1, cp2, cp3);
+ offsets2 = client.remoteConsumerOffsets("unknown-group", SOURCE,
Duration.ofSeconds(10L));
+ assertTrue(offsets2.isEmpty());
+ }
+
private ReplicationPolicy identityReplicationPolicy(String source) {
IdentityReplicationPolicy policy = new IdentityReplicationPolicy();
policy.configure(Map.of(IdentityReplicationPolicy.SOURCE_CLUSTER_ALIAS_CONFIG,
source));
return policy;
}
+
+ private MockConsumer<byte[], byte[]> buildConsumer(TopicPartition
checkpointTp, Checkpoint... checkpoints) {
+ MockConsumer<byte[], byte[]> consumer = new
MockConsumer<>(AutoOffsetResetStrategy.NONE.name());
+ consumer.updateBeginningOffsets(Map.of(checkpointTp, 0L));
+ consumer.assign(Set.of(checkpointTp));
+ for (int i = 0; i < checkpoints.length; i++) {
+ Checkpoint checkpoint = checkpoints[i];
+ consumer.addRecord(new ConsumerRecord<>(checkpointTp.topic(), 0,
i, checkpoint.recordKey(), checkpoint.recordValue()));
+ }
+ consumer.updateEndOffsets(Map.of(checkpointTp, checkpoints.length -
1L));
+ return consumer;
+ }
}