This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 169ec72  KAFKA-12254: Ensure MM2 creates topics with source topic 
configs (#10217)
169ec72 is described below

commit 169ec72c782644806a118ae53b864728842c5b57
Author: Dhruvil Shah <[email protected]>
AuthorDate: Mon Mar 1 01:30:30 2021 -0800

    KAFKA-12254: Ensure MM2 creates topics with source topic configs (#10217)
    
    MM2 creates new topics on the destination cluster with default 
configurations. It has an async periodic task to refresh topic configurations 
from the source to destination. However, this opens up a window where the 
destination cluster has data produced to it with default configurations. In the 
worst case, this could cause data loss if the destination topic is created 
without the right cleanup.policy. This commit fixes the above issue by ensuring 
that the right configurations are suppl [...]
    
    Reviewers: Rajini Sivaram <[email protected]>
---
 .../connect/mirror/MirrorSourceConnector.java      | 90 +++++++++++++++++-----
 .../connect/mirror/MirrorSourceConnectorTest.java  | 31 ++++++--
 2 files changed, 92 insertions(+), 29 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 3e7f0c7..7b844c8 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -49,6 +49,7 @@ import java.util.Set;
 import java.util.HashSet;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.concurrent.ExecutionException;
@@ -306,40 +307,82 @@ public class MirrorSourceConnector extends 
SourceConnector {
         
MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), 
config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig());
     }
 
-    // visible for testing
-    void computeAndCreateTopicPartitions()
-            throws InterruptedException, ExecutionException {
-        Map<String, Long> partitionCounts = knownSourceTopicPartitions.stream()
-            .collect(Collectors.groupingBy(TopicPartition::topic, 
Collectors.counting())).entrySet().stream()
-            .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), 
Entry::getValue));
-        Set<String> knownTargetTopics = toTopics(knownTargetTopicPartitions);
-        List<NewTopic> newTopics = partitionCounts.entrySet().stream()
-            .filter(x -> !knownTargetTopics.contains(x.getKey()))
-            .map(x -> new NewTopic(x.getKey(), x.getValue().intValue(), 
(short) replicationFactor))
-            .collect(Collectors.toList());
-        Map<String, NewPartitions> newPartitions = 
partitionCounts.entrySet().stream()
-            .filter(x -> knownTargetTopics.contains(x.getKey()))
-            .collect(Collectors.toMap(Entry::getKey, x -> 
NewPartitions.increaseTo(x.getValue().intValue())));
-        createTopicPartitions(partitionCounts, newTopics, newPartitions);
+    void computeAndCreateTopicPartitions() throws ExecutionException, 
InterruptedException {
+        // get source and target topics with respective partition counts
+        Map<String, Long> sourceTopicToPartitionCounts = 
knownSourceTopicPartitions.stream()
+                .collect(Collectors.groupingBy(TopicPartition::topic, 
Collectors.counting())).entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+        Map<String, Long> targetTopicToPartitionCounts = 
knownTargetTopicPartitions.stream()
+                .collect(Collectors.groupingBy(TopicPartition::topic, 
Collectors.counting())).entrySet().stream()
+                .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+
+        Set<String> knownSourceTopics = sourceTopicToPartitionCounts.keySet();
+        Set<String> knownTargetTopics = targetTopicToPartitionCounts.keySet();
+        Map<String, String> sourceToRemoteTopics = knownSourceTopics.stream()
+                .collect(Collectors.toMap(Function.identity(), sourceTopic -> 
formatRemoteTopic(sourceTopic)));
+
+        // compute existing and new source topics
+        Map<Boolean, Set<String>> partitionedSourceTopics = 
knownSourceTopics.stream()
+                .collect(Collectors.partitioningBy(sourceTopic -> 
knownTargetTopics.contains(sourceToRemoteTopics.get(sourceTopic)),
+                        Collectors.toSet()));
+        Set<String> existingSourceTopics = partitionedSourceTopics.get(true);
+        Set<String> newSourceTopics = partitionedSourceTopics.get(false);
+
+        // create new topics
+        if (!newSourceTopics.isEmpty())
+            createNewTopics(newSourceTopics, sourceTopicToPartitionCounts);
+
+        // compute topics with new partitions
+        Map<String, Long> sourceTopicsWithNewPartitions = 
existingSourceTopics.stream()
+                .filter(sourceTopic -> {
+                    String targetTopic = sourceToRemoteTopics.get(sourceTopic);
+                    return sourceTopicToPartitionCounts.get(sourceTopic) > 
targetTopicToPartitionCounts.get(targetTopic);
+                })
+                .collect(Collectors.toMap(Function.identity(), 
sourceTopicToPartitionCounts::get));
+
+        // create new partitions
+        if (!sourceTopicsWithNewPartitions.isEmpty()) {
+            Map<String, NewPartitions> newTargetPartitions = 
sourceTopicsWithNewPartitions.entrySet().stream()
+                    .collect(Collectors.toMap(sourceTopicAndPartitionCount -> 
sourceToRemoteTopics.get(sourceTopicAndPartitionCount.getKey()),
+                        sourceTopicAndPartitionCount -> 
NewPartitions.increaseTo(sourceTopicAndPartitionCount.getValue().intValue())));
+            createNewPartitions(newTargetPartitions);
+        }
+    }
+
+    private void createNewTopics(Set<String> newSourceTopics, Map<String, 
Long> sourceTopicToPartitionCounts)
+            throws ExecutionException, InterruptedException {
+        Map<String, Config> sourceTopicToConfig = 
describeTopicConfigs(newSourceTopics);
+        Map<String, NewTopic> newTopics = newSourceTopics.stream()
+                .map(sourceTopic -> {
+                    String remoteTopic = formatRemoteTopic(sourceTopic);
+                    int partitionCount = 
sourceTopicToPartitionCounts.get(sourceTopic).intValue();
+                    Map<String, String> configs = 
configToMap(sourceTopicToConfig.get(sourceTopic));
+                    return new NewTopic(remoteTopic, partitionCount, (short) 
replicationFactor)
+                            .configs(configs);
+                })
+                .collect(Collectors.toMap(NewTopic::name, 
Function.identity()));
+        createNewTopics(newTopics);
     }
 
     // visible for testing
-    void createTopicPartitions(Map<String, Long> partitionCounts, 
List<NewTopic> newTopics,
-            Map<String, NewPartitions> newPartitions) {
-        targetAdminClient.createTopics(newTopics, new 
CreateTopicsOptions()).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+    void createNewTopics(Map<String, NewTopic> newTopics) {
+        targetAdminClient.createTopics(newTopics.values(), new 
CreateTopicsOptions()).values().forEach((k, v) -> v.whenComplete((x, e) -> {
             if (e != null) {
                 log.warn("Could not create topic {}.", k, e);
             } else {
-                log.info("Created remote topic {} with {} partitions.", k, 
partitionCounts.get(k));
+                log.info("Created remote topic {} with {} partitions.", k, 
newTopics.get(k).numPartitions());
             }
         }));
+    }
+
+    void createNewPartitions(Map<String, NewPartitions> newPartitions) {
         targetAdminClient.createPartitions(newPartitions).values().forEach((k, 
v) -> v.whenComplete((x, e) -> {
             if (e instanceof InvalidPartitionsException) {
                 // swallow, this is normal
             } else if (e != null) {
                 log.warn("Could not create topic-partitions for {}.", k, e);
             } else {
-                log.info("Increased size of {} to {} partitions.", k, 
partitionCounts.get(k));
+                log.info("Increased size of {} to {} partitions.", k, 
newPartitions.get(k).totalCount());
             }
         }));
     }
@@ -359,6 +402,11 @@ public class MirrorSourceConnector extends SourceConnector 
{
         return adminClient.describeTopics(topics).all().get().values();
     }
 
+    static Map<String, String> configToMap(Config config) {
+        return config.entries().stream()
+                .collect(Collectors.toMap(ConfigEntry::name, 
ConfigEntry::value));
+    }
+
     @SuppressWarnings("deprecation")
     // use deprecated alterConfigs API for broker compatibility back to 0.11.0
     private void updateTopicConfigs(Map<String, Config> topicConfigs)
@@ -390,7 +438,7 @@ public class MirrorSourceConnector extends SourceConnector {
             .map(x -> new TopicPartition(topic, x.partition()));
     }
 
-    private Map<String, Config> describeTopicConfigs(Set<String> topics)
+    Map<String, Config> describeTopicConfigs(Set<String> topics)
             throws InterruptedException, ExecutionException {
         Set<ConfigResource> resources = topics.stream()
             .map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x))
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index a963391..42d7951 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -183,10 +183,16 @@ public class MirrorSourceConnectorTest {
         connector.initialize(mock(ConnectorContext.class));
         connector = spy(connector);
 
+        Config topicConfig = new Config(Arrays.asList(
+                new ConfigEntry("cleanup.policy", "compact"),
+                new ConfigEntry("segment.bytes", "100")));
+        Map<String, Config> configs = Collections.singletonMap("topic", 
topicConfig);
+
         List<TopicPartition> sourceTopicPartitions = 
Collections.singletonList(new TopicPartition("topic", 0));
         
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
         
doReturn(Collections.emptyList()).when(connector).findTargetTopicPartitions();
-        doNothing().when(connector).createTopicPartitions(any(), any(), any());
+        
doReturn(configs).when(connector).describeTopicConfigs(Collections.singleton("topic"));
+        doNothing().when(connector).createNewTopics(any());
 
         connector.refreshTopicPartitions();
         // if target topic is not created, refreshTopicPartitions() will call 
createTopicPartitions() again
@@ -194,13 +200,15 @@ public class MirrorSourceConnectorTest {
 
         Map<String, Long> expectedPartitionCounts = new HashMap<>();
         expectedPartitionCounts.put("source.topic", 1L);
-        List<NewTopic> expectedNewTopics = Arrays.asList(new 
NewTopic("source.topic", 1, (short) 0));
+        Map<String, String> configMap = 
MirrorSourceConnector.configToMap(topicConfig);
+        assertEquals(2, configMap.size());
+
+        Map<String, NewTopic> expectedNewTopics = new HashMap<>();
+        expectedNewTopics.put("source.topic", new NewTopic("source.topic", 1, 
(short) 0).configs(configMap));
 
         verify(connector, times(2)).computeAndCreateTopicPartitions();
-        verify(connector, times(2)).createTopicPartitions(
-                eq(expectedPartitionCounts),
-                eq(expectedNewTopics),
-                eq(Collections.emptyMap()));
+        verify(connector, times(2)).createNewTopics(eq(expectedNewTopics));
+        verify(connector, times(0)).createNewPartitions(any());
 
         List<TopicPartition> targetTopicPartitions = 
Collections.singletonList(new TopicPartition("source.topic", 0));
         
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
@@ -217,11 +225,19 @@ public class MirrorSourceConnectorTest {
         connector.initialize(mock(ConnectorContext.class));
         connector = spy(connector);
 
+        Config topicConfig = new Config(Arrays.asList(
+                new ConfigEntry("cleanup.policy", "compact"),
+                new ConfigEntry("segment.bytes", "100")));
+        Map<String, Config> configs = Collections.singletonMap("source.topic", 
topicConfig);
+
         List<TopicPartition> sourceTopicPartitions = Collections.emptyList();
         List<TopicPartition> targetTopicPartitions = 
Collections.singletonList(new TopicPartition("source.topic", 0));
         
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
         
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
-        doNothing().when(connector).createTopicPartitions(any(), any(), any());
+        
doReturn(configs).when(connector).describeTopicConfigs(Collections.singleton("source.topic"));
+        
doReturn(Collections.emptyMap()).when(connector).describeTopicConfigs(Collections.emptySet());
+        doNothing().when(connector).createNewTopics(any());
+        doNothing().when(connector).createNewPartitions(any());
 
         // partitions appearing on the target cluster should not cause 
reconfiguration
         connector.refreshTopicPartitions();
@@ -234,6 +250,5 @@ public class MirrorSourceConnectorTest {
         // when partitions are added to the source cluster, reconfiguration is 
triggered
         connector.refreshTopicPartitions();
         verify(connector, times(1)).computeAndCreateTopicPartitions();
-
     }
 }

Reply via email to