This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new d67f6872892 KAFKA-13988: Enable replicating from latest offset with 
MirrorMaker 2 (#14567)
d67f6872892 is described below

commit d67f6872892e2a2c76f54c4baafbc5f5bccd33a7
Author: Chris Egerton <chr...@aiven.io>
AuthorDate: Mon Dec 4 16:37:37 2023 -0500

    KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 
(#14567)
    
    Reviewers: hudeqi <1217150...@qq.com>, Federico Valeri 
<fedeval...@gmail.com>, Greg Harris <gharris1...@gmail.com>
---
 .../kafka/connect/mirror/MirrorSourceTask.java     | 33 +++++++++---
 .../kafka/connect/mirror/MirrorSourceTaskTest.java | 63 ++++++++++++++++++++++
 .../MirrorConnectorsIntegrationBaseTest.java       | 29 ++++++++++
 .../ExactlyOnceSourceIntegrationTest.java          |  8 +--
 .../util/clusters/EmbeddedKafkaCluster.java        | 13 +++++
 5 files changed, 132 insertions(+), 14 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index 84e393edb36..cad57d4ad02 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -103,12 +103,8 @@ public class MirrorSourceTask extends SourceTask {
         consumer = 
MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer"));
         offsetProducer = 
MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig());
         Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
-        Map<TopicPartition, Long> topicPartitionOffsets = 
loadOffsets(taskTopicPartitions);
-        consumer.assign(topicPartitionOffsets.keySet());
-        log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.entrySet().stream()
-            .filter(x -> x.getValue() == 0L).count());
-        log.trace("Seeking offsets: {}", topicPartitionOffsets);
-        topicPartitionOffsets.forEach(consumer::seek);
+        initializeConsumer(taskTopicPartitions);
+
         log.info("{} replicating {} topic-partitions {}->{}: {}.", 
Thread.currentThread().getName(),
             taskTopicPartitions.size(), sourceClusterAlias, 
config.targetClusterAlias(), taskTopicPartitions);
     }
@@ -266,7 +262,26 @@ public class MirrorSourceTask extends SourceTask {
     private Long loadOffset(TopicPartition topicPartition) {
         Map<String, Object> wrappedPartition = 
MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias);
         Map<String, Object> wrappedOffset = 
context.offsetStorageReader().offset(wrappedPartition);
-        return MirrorUtils.unwrapOffset(wrappedOffset) + 1;
+        return MirrorUtils.unwrapOffset(wrappedOffset);
+    }
+
+    // visible for testing
+    void initializeConsumer(Set<TopicPartition> taskTopicPartitions) {
+        Map<TopicPartition, Long> topicPartitionOffsets = 
loadOffsets(taskTopicPartitions);
+        consumer.assign(topicPartitionOffsets.keySet());
+        log.info("Starting with {} previously uncommitted partitions.", 
topicPartitionOffsets.values().stream()
+                .filter(this::isUncommitted).count());
+
+        topicPartitionOffsets.forEach((topicPartition, offset) -> {
+            // Do not call seek on partitions that don't have an existing 
offset committed.
+            if (isUncommitted(offset)) {
+                log.trace("Skipping seeking offset for topicPartition: {}", 
topicPartition);
+                return;
+            }
+            long nextOffsetToCommittedOffset = offset + 1L;
+            log.trace("Seeking to offset {} for topicPartition: {}", 
nextOffsetToCommittedOffset, topicPartition);
+            consumer.seek(topicPartition, nextOffsetToCommittedOffset);
+        });
     }
 
     // visible for testing 
@@ -302,6 +317,10 @@ public class MirrorSourceTask extends SourceTask {
         }
     }
 
+    private boolean isUncommitted(Long offset) {
+        return offset == null || offset < 0;
+    }
+
     static class PartitionState {
         long previousUpstreamOffset = -1L;
         long previousDownstreamOffset = -1L;
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 0c566eb596b..647935eb356 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -31,25 +31,33 @@ import 
org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
 import org.apache.kafka.connect.source.SourceRecord;
 
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.Semaphore;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verifyNoInteractions;
 
@@ -214,6 +222,61 @@ public class MirrorSourceTaskTest {
         }
     }
 
+    @Test
+    public void testSeekBehaviorDuringStart() {
+        // Setting up mock behavior.
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+
+        SourceTaskContext mockSourceTaskContext = 
mock(SourceTaskContext.class);
+        OffsetStorageReader mockOffsetStorageReader = 
mock(OffsetStorageReader.class);
+        
when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader);
+
+        Set<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList(
+                new TopicPartition("previouslyReplicatedTopic", 8),
+                new TopicPartition("previouslyReplicatedTopic1", 0),
+                new TopicPartition("previouslyReplicatedTopic", 1),
+                new TopicPartition("newTopicToReplicate1", 1),
+                new TopicPartition("newTopicToReplicate1", 4),
+                new TopicPartition("newTopicToReplicate2", 0)
+        ));
+
+        long arbitraryCommittedOffset = 4L;
+        long offsetToSeek = arbitraryCommittedOffset + 1L;
+        
when(mockOffsetStorageReader.offset(anyMap())).thenAnswer(testInvocation -> {
+            Map<String, Object> topicPartitionOffsetMap = 
testInvocation.getArgument(0);
+            String topicName = topicPartitionOffsetMap.get("topic").toString();
+
+            // Only return the offset for previously replicated topics.
+            // For others, there is no value set.
+            if (topicName.startsWith("previouslyReplicatedTopic")) {
+                topicPartitionOffsetMap.put("offset", 
arbitraryCommittedOffset);
+            }
+            return topicPartitionOffsetMap;
+        });
+
+        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(mockConsumer, 
null, null,
+                new DefaultReplicationPolicy(), 50, null, null, null, null);
+        mirrorSourceTask.initialize(mockSourceTaskContext);
+
+        // Call test subject
+        mirrorSourceTask.initializeConsumer(topicPartitions);
+
+        // Verifications
+        // Ensure all the topic partitions are assigned to consumer
+        verify(mockConsumer, times(1)).assign(topicPartitions);
+
+        // Ensure seek is only called for previously committed topic 
partitions.
+        verify(mockConsumer, times(1))
+                .seek(new TopicPartition("previouslyReplicatedTopic", 8), 
offsetToSeek);
+        verify(mockConsumer, times(1))
+                .seek(new TopicPartition("previouslyReplicatedTopic", 1), 
offsetToSeek);
+        verify(mockConsumer, times(1))
+                .seek(new TopicPartition("previouslyReplicatedTopic1", 0), 
offsetToSeek);
+
+        verifyNoMoreInteractions(mockConsumer);
+    }
+
     @Test
     public void testCommitRecordWithNullMetadata() {
         // Create a consumer mock
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 6ea080a59a3..4cb51b1153c 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -81,6 +81,7 @@ import org.junit.jupiter.api.Tag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -874,6 +875,34 @@ public class MirrorConnectorsIntegrationBaseTest {
         }, 30000, "Topic configurations were not synced");
     }
 
+    @Test
+    public void testReplicateFromLatest() throws Exception {
+        // populate topic with records that should not be replicated
+        String topic = "test-topic-1";
+        produceMessages(primaryProducer, topic, NUM_PARTITIONS);
+
+        // consume from the ends of topics when no committed offsets are found
+        mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".consumer." + 
AUTO_OFFSET_RESET_CONFIG, "latest");
+        // one way replication from primary to backup
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // produce some more messages to the topic, now that MM2 is running 
and replication should be taking place
+        produceMessages(primaryProducer, topic, NUM_PARTITIONS);
+
+        String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
+        // wait for at least the expected number of records to be replicated 
to the backup cluster
+        backup.kafka().consume(NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION, 
RECORD_TRANSFER_DURATION_MS, backupTopic);
+        // consume all records from backup cluster
+        ConsumerRecords<byte[], byte[]> replicatedRecords = 
backup.kafka().consumeAll(RECORD_TRANSFER_DURATION_MS, backupTopic);
+        // ensure that we only replicated the records produced after startup
+        replicatedRecords.partitions().forEach(topicPartition -> {
+            int replicatedCount = 
replicatedRecords.records(topicPartition).size();
+            assertEquals(NUM_RECORDS_PER_PARTITION, replicatedCount);
+        });
+    }
+
     private TopicPartition remoteTopicPartition(TopicPartition tp, String 
alias) {
         return new TopicPartition(remoteTopicName(tp.topic(), alias), 
tp.partition());
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 21cd7340653..26b2d7cba16 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -809,13 +809,7 @@ public class ExactlyOnceSourceIntegrationTest {
             );
 
             // also consume from the cluster's global offsets topic
-            offsetRecords = connect.kafka()
-                    .consumeAll(
-                            TimeUnit.MINUTES.toMillis(1),
-                            null,
-                            null,
-                            globalOffsetsTopic
-                    );
+            offsetRecords = 
connect.kafka().consumeAll(TimeUnit.MINUTES.toMillis(1), globalOffsetsTopic);
             seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
             seqnos.forEach(seqno ->
                 assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + MINIMUM_MESSAGES + " records",
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index df8a2253531..98af3e827f8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -525,6 +525,19 @@ public class EmbeddedKafkaCluster {
         throw new RuntimeException("Could not find enough records. found " + 
consumedRecords + ", expected " + n);
     }
 
+    /**
+     * Consume all currently-available records for the specified topics in a 
given duration, or throw an exception.
+     * @param maxDurationMs the max duration to wait for these records (in 
milliseconds).
+     * @param topics the topics to consume from
+     * @return a {@link ConsumerRecords} collection containing the records for 
all partitions of the given topics
+     */
+    public ConsumerRecords<byte[], byte[]> consumeAll(
+            long maxDurationMs,
+            String... topics
+    ) throws TimeoutException, InterruptedException, ExecutionException {
+        return consumeAll(maxDurationMs, null, null, topics);
+    }
+
     /**
      * Consume all currently-available records for the specified topics in a 
given duration, or throw an exception.
      * @param maxDurationMs the max duration to wait for these records (in 
milliseconds).

Reply via email to