This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push: new d67f6872892 KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567) d67f6872892 is described below commit d67f6872892e2a2c76f54c4baafbc5f5bccd33a7 Author: Chris Egerton <chr...@aiven.io> AuthorDate: Mon Dec 4 16:37:37 2023 -0500 KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567) Reviewers: hudeqi <1217150...@qq.com>, Federico Valeri <fedeval...@gmail.com>, Greg Harris <gharris1...@gmail.com> --- .../kafka/connect/mirror/MirrorSourceTask.java | 33 +++++++++--- .../kafka/connect/mirror/MirrorSourceTaskTest.java | 63 ++++++++++++++++++++++ .../MirrorConnectorsIntegrationBaseTest.java | 29 ++++++++++ .../ExactlyOnceSourceIntegrationTest.java | 8 +-- .../util/clusters/EmbeddedKafkaCluster.java | 13 +++++ 5 files changed, 132 insertions(+), 14 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 84e393edb36..cad57d4ad02 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -103,12 +103,8 @@ public class MirrorSourceTask extends SourceTask { consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer")); offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig()); Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions(); - Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); - consumer.assign(topicPartitionOffsets.keySet()); - log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream() - .filter(x -> x.getValue() == 0L).count()); - log.trace("Seeking offsets: {}", topicPartitionOffsets); - topicPartitionOffsets.forEach(consumer::seek); + initializeConsumer(taskTopicPartitions); + log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(), taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions); } @@ -266,7 +262,26 @@ public class MirrorSourceTask extends SourceTask { private Long loadOffset(TopicPartition topicPartition) { Map<String, Object> wrappedPartition = MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias); Map<String, Object> wrappedOffset = context.offsetStorageReader().offset(wrappedPartition); - return MirrorUtils.unwrapOffset(wrappedOffset) + 1; + return MirrorUtils.unwrapOffset(wrappedOffset); + } + + // visible for testing + void initializeConsumer(Set<TopicPartition> taskTopicPartitions) { + Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions); + consumer.assign(topicPartitionOffsets.keySet()); + log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() + .filter(this::isUncommitted).count()); + + topicPartitionOffsets.forEach((topicPartition, offset) -> { + // Do not call seek on partitions that don't have an existing offset committed. + if (isUncommitted(offset)) { + log.trace("Skipping seeking offset for topicPartition: {}", topicPartition); + return; + } + long nextOffsetToCommittedOffset = offset + 1L; + log.trace("Seeking to offset {} for topicPartition: {}", nextOffsetToCommittedOffset, topicPartition); + consumer.seek(topicPartition, nextOffsetToCommittedOffset); + }); } // visible for testing @@ -302,6 +317,10 @@ public class MirrorSourceTask extends SourceTask { } } + private boolean isUncommitted(Long offset) { + return offset == null || offset < 0; + } + static class PartitionState { long previousUpstreamOffset = -1L; long previousDownstreamOffset = -1L; diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 0c566eb596b..647935eb356 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -31,25 +31,33 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.Semaphore; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; + import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verifyNoInteractions; @@ -214,6 +222,61 @@ public class MirrorSourceTaskTest { } } + @Test + public void testSeekBehaviorDuringStart() { + // Setting up mock behavior. + @SuppressWarnings("unchecked") + KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class); + + SourceTaskContext mockSourceTaskContext = mock(SourceTaskContext.class); + OffsetStorageReader mockOffsetStorageReader = mock(OffsetStorageReader.class); + when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader); + + Set<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList( + new TopicPartition("previouslyReplicatedTopic", 8), + new TopicPartition("previouslyReplicatedTopic1", 0), + new TopicPartition("previouslyReplicatedTopic", 1), + new TopicPartition("newTopicToReplicate1", 1), + new TopicPartition("newTopicToReplicate1", 4), + new TopicPartition("newTopicToReplicate2", 0) + )); + + long arbitraryCommittedOffset = 4L; + long offsetToSeek = arbitraryCommittedOffset + 1L; + when(mockOffsetStorageReader.offset(anyMap())).thenAnswer(testInvocation -> { + Map<String, Object> topicPartitionOffsetMap = testInvocation.getArgument(0); + String topicName = topicPartitionOffsetMap.get("topic").toString(); + + // Only return the offset for previously replicated topics. + // For others, there is no value set. + if (topicName.startsWith("previouslyReplicatedTopic")) { + topicPartitionOffsetMap.put("offset", arbitraryCommittedOffset); + } + return topicPartitionOffsetMap; + }); + + MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(mockConsumer, null, null, + new DefaultReplicationPolicy(), 50, null, null, null, null); + mirrorSourceTask.initialize(mockSourceTaskContext); + + // Call test subject + mirrorSourceTask.initializeConsumer(topicPartitions); + + // Verifications + // Ensure all the topic partitions are assigned to consumer + verify(mockConsumer, times(1)).assign(topicPartitions); + + // Ensure seek is only called for previously committed topic partitions. + verify(mockConsumer, times(1)) + .seek(new TopicPartition("previouslyReplicatedTopic", 8), offsetToSeek); + verify(mockConsumer, times(1)) + .seek(new TopicPartition("previouslyReplicatedTopic", 1), offsetToSeek); + verify(mockConsumer, times(1)) + .seek(new TopicPartition("previouslyReplicatedTopic1", 0), offsetToSeek); + + verifyNoMoreInteractions(mockConsumer); + } + @Test public void testCommitRecordWithNullMetadata() { // Create a consumer mock diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 6ea080a59a3..4cb51b1153c 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -81,6 +81,7 @@ import org.junit.jupiter.api.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -874,6 +875,34 @@ public class MirrorConnectorsIntegrationBaseTest { }, 30000, "Topic configurations were not synced"); } + @Test + public void testReplicateFromLatest() throws Exception { + // populate topic with records that should not be replicated + String topic = "test-topic-1"; + produceMessages(primaryProducer, topic, NUM_PARTITIONS); + + // consume from the ends of topics when no committed offsets are found + mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".consumer." + AUTO_OFFSET_RESET_CONFIG, "latest"); + // one way replication from primary to backup + mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); + mm2Config = new MirrorMakerConfig(mm2Props); + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // produce some more messages to the topic, now that MM2 is running and replication should be taking place + produceMessages(primaryProducer, topic, NUM_PARTITIONS); + + String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS); + // wait for at least the expected number of records to be replicated to the backup cluster + backup.kafka().consume(NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, backupTopic); + // consume all records from backup cluster + ConsumerRecords<byte[], byte[]> replicatedRecords = backup.kafka().consumeAll(RECORD_TRANSFER_DURATION_MS, backupTopic); + // ensure that we only replicated the records produced after startup + replicatedRecords.partitions().forEach(topicPartition -> { + int replicatedCount = replicatedRecords.records(topicPartition).size(); + assertEquals(NUM_RECORDS_PER_PARTITION, replicatedCount); + }); + } + private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) { return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition()); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 21cd7340653..26b2d7cba16 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -809,13 +809,7 @@ public class ExactlyOnceSourceIntegrationTest { ); // also consume from the cluster's global offsets topic - offsetRecords = connect.kafka() - .consumeAll( - TimeUnit.MINUTES.toMillis(1), - null, - null, - globalOffsetsTopic - ); + offsetRecords = connect.kafka().consumeAll(TimeUnit.MINUTES.toMillis(1), globalOffsetsTopic); seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords); seqnos.forEach(seqno -> assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + MINIMUM_MESSAGES + " records", diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index df8a2253531..98af3e827f8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -525,6 +525,19 @@ public class EmbeddedKafkaCluster { throw new RuntimeException("Could not find enough records. found " + consumedRecords + ", expected " + n); } + /** + * Consume all currently-available records for the specified topics in a given duration, or throw an exception. + * @param maxDurationMs the max duration to wait for these records (in milliseconds). + * @param topics the topics to consume from + * @return a {@link ConsumerRecords} collection containing the records for all partitions of the given topics + */ + public ConsumerRecords<byte[], byte[]> consumeAll( + long maxDurationMs, + String... topics + ) throws TimeoutException, InterruptedException, ExecutionException { + return consumeAll(maxDurationMs, null, null, topics); + } + /** * Consume all currently-available records for the specified topics in a given duration, or throw an exception. * @param maxDurationMs the max duration to wait for these records (in milliseconds).