This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a83bc2d977d KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567) a83bc2d977d is described below commit a83bc2d977d2af85d4edfc8096854137481001e9 Author: Chris Egerton <chr...@aiven.io> AuthorDate: Mon Dec 4 16:37:37 2023 -0500 KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567) Reviewers: hudeqi <1217150...@qq.com>, Federico Valeri <fedeval...@gmail.com>, Greg Harris <gharris1...@gmail.com> --- .../kafka/connect/mirror/MirrorSourceTask.java | 33 +++++++++--- .../kafka/connect/mirror/MirrorSourceTaskTest.java | 63 ++++++++++++++++++++++ .../MirrorConnectorsIntegrationBaseTest.java | 29 ++++++++++ 3 files changed, 118 insertions(+), 7 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 84e393edb36..cad57d4ad02 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -103,12 +103,8 @@ public class MirrorSourceTask extends SourceTask { consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer")); offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig()); Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions(); - Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); - consumer.assign(topicPartitionOffsets.keySet()); - log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream() - .filter(x -> x.getValue() == 0L).count()); - log.trace("Seeking offsets: {}", topicPartitionOffsets); - topicPartitionOffsets.forEach(consumer::seek); + initializeConsumer(taskTopicPartitions); + log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(), taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions); } @@ -266,7 +262,26 @@ public class MirrorSourceTask extends SourceTask { private Long loadOffset(TopicPartition topicPartition) { Map<String, Object> wrappedPartition = MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias); Map<String, Object> wrappedOffset = context.offsetStorageReader().offset(wrappedPartition); - return MirrorUtils.unwrapOffset(wrappedOffset) + 1; + return MirrorUtils.unwrapOffset(wrappedOffset); + } + + // visible for testing + void initializeConsumer(Set<TopicPartition> taskTopicPartitions) { + Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); + consumer.assign(topicPartitionOffsets.keySet()); + log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() + .filter(this::isUncommitted).count()); + + topicPartitionOffsets.forEach((topicPartition, offset) -> { + // Do not call seek on partitions that don't have an existing offset committed. + if (isUncommitted(offset)) { + log.trace("Skipping seeking offset for topicPartition: {}", topicPartition); + return; + } + long nextOffsetToCommittedOffset = offset + 1L; + log.trace("Seeking to offset {} for topicPartition: {}", nextOffsetToCommittedOffset, topicPartition); + consumer.seek(topicPartition, nextOffsetToCommittedOffset); + }); } // visible for testing @@ -302,6 +317,10 @@ public class MirrorSourceTask extends SourceTask { } } + private boolean isUncommitted(Long offset) { + return offset == null || offset < 0; + } + static class PartitionState { long previousUpstreamOffset = -1L; long previousDownstreamOffset = -1L; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 0c566eb596b..647935eb356 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -31,25 +31,33 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.Semaphore; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; + import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verifyNoInteractions; @@ -214,6 +222,61 @@ public class MirrorSourceTaskTest { } } + @Test + public void testSeekBehaviorDuringStart() { + // Setting up mock behavior. + @SuppressWarnings("unchecked") + KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class); + + SourceTaskContext mockSourceTaskContext = mock(SourceTaskContext.class); + OffsetStorageReader mockOffsetStorageReader = mock(OffsetStorageReader.class); + when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader); + + Set<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList( + new TopicPartition("previouslyReplicatedTopic", 8), + new TopicPartition("previouslyReplicatedTopic1", 0), + new TopicPartition("previouslyReplicatedTopic", 1), + new TopicPartition("newTopicToReplicate1", 1), + new TopicPartition("newTopicToReplicate1", 4), + new TopicPartition("newTopicToReplicate2", 0) + )); + + long arbitraryCommittedOffset = 4L; + long offsetToSeek = arbitraryCommittedOffset + 1L; + when(mockOffsetStorageReader.offset(anyMap())).thenAnswer(testInvocation -> { + Map<String, Object> topicPartitionOffsetMap = testInvocation.getArgument(0); + String topicName = topicPartitionOffsetMap.get("topic").toString(); + + // Only return the offset for previously replicated topics. + // For others, there is no value set. + if (topicName.startsWith("previouslyReplicatedTopic")) { + topicPartitionOffsetMap.put("offset", arbitraryCommittedOffset); + } + return topicPartitionOffsetMap; + }); + + MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(mockConsumer, null, null, + new DefaultReplicationPolicy(), 50, null, null, null, null); + mirrorSourceTask.initialize(mockSourceTaskContext); + + // Call test subject + mirrorSourceTask.initializeConsumer(topicPartitions); + + // Verifications + // Ensure all the topic partitions are assigned to consumer + verify(mockConsumer, times(1)).assign(topicPartitions); + + // Ensure seek is only called for previously committed topic partitions. + verify(mockConsumer, times(1)) + .seek(new TopicPartition("previouslyReplicatedTopic", 8), offsetToSeek); + verify(mockConsumer, times(1)) + .seek(new TopicPartition("previouslyReplicatedTopic", 1), offsetToSeek); + verify(mockConsumer, times(1)) + .seek(new TopicPartition("previouslyReplicatedTopic1", 0), offsetToSeek); + + verifyNoMoreInteractions(mockConsumer); + } + @Test public void testCommitRecordWithNullMetadata() { // Create a consumer mock diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 0f3a189b05e..3595eebe1d4 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -81,6 +81,7 @@ import org.junit.jupiter.api.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -874,6 +875,34 @@ public class MirrorConnectorsIntegrationBaseTest { }, 30000, "Topic configurations were not synced"); } + @Test + public void testReplicateFromLatest() throws Exception { + // populate topic with records that should not be replicated + String topic = "test-topic-1"; + produceMessages(primaryProducer, topic, NUM_PARTITIONS); + + // consume from the ends of topics when no committed offsets are found + mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".consumer." + AUTO_OFFSET_RESET_CONFIG, "latest"); + // one way replication from primary to backup + mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); + mm2Config = new MirrorMakerConfig(mm2Props); + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // produce some more messages to the topic, now that MM2 is running and replication should be taking place + produceMessages(primaryProducer, topic, NUM_PARTITIONS); + + String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS); + // wait for at least the expected number of records to be replicated to the backup cluster + backup.kafka().consume(NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, backupTopic); + // consume all records from backup cluster + ConsumerRecords<byte[], byte[]> replicatedRecords = backup.kafka().consumeAll(RECORD_TRANSFER_DURATION_MS, backupTopic); + // ensure that we only replicated the records produced after startup + replicatedRecords.partitions().forEach(topicPartition -> { + int replicatedCount = replicatedRecords.records(topicPartition).size(); + assertEquals(NUM_RECORDS_PER_PARTITION, replicatedCount); + }); + } + private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) { return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition()); }