This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 169ec72 KAFKA-12254: Ensure MM2 creates topics with source topic
configs (#10217)
169ec72 is described below
commit 169ec72c782644806a118ae53b864728842c5b57
Author: Dhruvil Shah <[email protected]>
AuthorDate: Mon Mar 1 01:30:30 2021 -0800
KAFKA-12254: Ensure MM2 creates topics with source topic configs (#10217)
MM2 creates new topics on the destination cluster with default
configurations. It has an async periodic task to refresh topic configurations
from the source to destination. However, this opens up a window where the
destination cluster has data produced to it with default configurations. In the
worst case, this could cause data loss if the destination topic is created
without the right cleanup.policy. This commit fixes the above issue by ensuring
that the right configurations are suppl [...]
Reviewers: Rajini Sivaram <[email protected]>
---
.../connect/mirror/MirrorSourceConnector.java | 90 +++++++++++++++++-----
.../connect/mirror/MirrorSourceConnectorTest.java | 31 ++++++--
2 files changed, 92 insertions(+), 29 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 3e7f0c7..7b844c8 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -49,6 +49,7 @@ import java.util.Set;
import java.util.HashSet;
import java.util.Collection;
import java.util.Collections;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.concurrent.ExecutionException;
@@ -306,40 +307,82 @@ public class MirrorSourceConnector extends
SourceConnector {
MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(),
config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig());
}
- // visible for testing
- void computeAndCreateTopicPartitions()
- throws InterruptedException, ExecutionException {
- Map<String, Long> partitionCounts = knownSourceTopicPartitions.stream()
- .collect(Collectors.groupingBy(TopicPartition::topic,
Collectors.counting())).entrySet().stream()
- .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()),
Entry::getValue));
- Set<String> knownTargetTopics = toTopics(knownTargetTopicPartitions);
- List<NewTopic> newTopics = partitionCounts.entrySet().stream()
- .filter(x -> !knownTargetTopics.contains(x.getKey()))
- .map(x -> new NewTopic(x.getKey(), x.getValue().intValue(),
(short) replicationFactor))
- .collect(Collectors.toList());
- Map<String, NewPartitions> newPartitions =
partitionCounts.entrySet().stream()
- .filter(x -> knownTargetTopics.contains(x.getKey()))
- .collect(Collectors.toMap(Entry::getKey, x ->
NewPartitions.increaseTo(x.getValue().intValue())));
- createTopicPartitions(partitionCounts, newTopics, newPartitions);
+ void computeAndCreateTopicPartitions() throws ExecutionException,
InterruptedException {
+ // get source and target topics with respective partition counts
+ Map<String, Long> sourceTopicToPartitionCounts =
knownSourceTopicPartitions.stream()
+ .collect(Collectors.groupingBy(TopicPartition::topic,
Collectors.counting())).entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+ Map<String, Long> targetTopicToPartitionCounts =
knownTargetTopicPartitions.stream()
+ .collect(Collectors.groupingBy(TopicPartition::topic,
Collectors.counting())).entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+
+ Set<String> knownSourceTopics = sourceTopicToPartitionCounts.keySet();
+ Set<String> knownTargetTopics = targetTopicToPartitionCounts.keySet();
+ Map<String, String> sourceToRemoteTopics = knownSourceTopics.stream()
+ .collect(Collectors.toMap(Function.identity(), sourceTopic ->
formatRemoteTopic(sourceTopic)));
+
+ // compute existing and new source topics
+ Map<Boolean, Set<String>> partitionedSourceTopics =
knownSourceTopics.stream()
+ .collect(Collectors.partitioningBy(sourceTopic ->
knownTargetTopics.contains(sourceToRemoteTopics.get(sourceTopic)),
+ Collectors.toSet()));
+ Set<String> existingSourceTopics = partitionedSourceTopics.get(true);
+ Set<String> newSourceTopics = partitionedSourceTopics.get(false);
+
+ // create new topics
+ if (!newSourceTopics.isEmpty())
+ createNewTopics(newSourceTopics, sourceTopicToPartitionCounts);
+
+ // compute topics with new partitions
+ Map<String, Long> sourceTopicsWithNewPartitions =
existingSourceTopics.stream()
+ .filter(sourceTopic -> {
+ String targetTopic = sourceToRemoteTopics.get(sourceTopic);
+ return sourceTopicToPartitionCounts.get(sourceTopic) >
targetTopicToPartitionCounts.get(targetTopic);
+ })
+ .collect(Collectors.toMap(Function.identity(),
sourceTopicToPartitionCounts::get));
+
+ // create new partitions
+ if (!sourceTopicsWithNewPartitions.isEmpty()) {
+ Map<String, NewPartitions> newTargetPartitions =
sourceTopicsWithNewPartitions.entrySet().stream()
+ .collect(Collectors.toMap(sourceTopicAndPartitionCount ->
sourceToRemoteTopics.get(sourceTopicAndPartitionCount.getKey()),
+ sourceTopicAndPartitionCount ->
NewPartitions.increaseTo(sourceTopicAndPartitionCount.getValue().intValue())));
+ createNewPartitions(newTargetPartitions);
+ }
+ }
+
+ private void createNewTopics(Set<String> newSourceTopics, Map<String,
Long> sourceTopicToPartitionCounts)
+ throws ExecutionException, InterruptedException {
+ Map<String, Config> sourceTopicToConfig =
describeTopicConfigs(newSourceTopics);
+ Map<String, NewTopic> newTopics = newSourceTopics.stream()
+ .map(sourceTopic -> {
+ String remoteTopic = formatRemoteTopic(sourceTopic);
+ int partitionCount =
sourceTopicToPartitionCounts.get(sourceTopic).intValue();
+ Map<String, String> configs =
configToMap(sourceTopicToConfig.get(sourceTopic));
+ return new NewTopic(remoteTopic, partitionCount, (short)
replicationFactor)
+ .configs(configs);
+ })
+ .collect(Collectors.toMap(NewTopic::name,
Function.identity()));
+ createNewTopics(newTopics);
}
// visible for testing
- void createTopicPartitions(Map<String, Long> partitionCounts,
List<NewTopic> newTopics,
- Map<String, NewPartitions> newPartitions) {
- targetAdminClient.createTopics(newTopics, new
CreateTopicsOptions()).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+ void createNewTopics(Map<String, NewTopic> newTopics) {
+ targetAdminClient.createTopics(newTopics.values(), new
CreateTopicsOptions()).values().forEach((k, v) -> v.whenComplete((x, e) -> {
if (e != null) {
log.warn("Could not create topic {}.", k, e);
} else {
- log.info("Created remote topic {} with {} partitions.", k,
partitionCounts.get(k));
+ log.info("Created remote topic {} with {} partitions.", k,
newTopics.get(k).numPartitions());
}
}));
+ }
+
+ void createNewPartitions(Map<String, NewPartitions> newPartitions) {
targetAdminClient.createPartitions(newPartitions).values().forEach((k,
v) -> v.whenComplete((x, e) -> {
if (e instanceof InvalidPartitionsException) {
// swallow, this is normal
} else if (e != null) {
log.warn("Could not create topic-partitions for {}.", k, e);
} else {
- log.info("Increased size of {} to {} partitions.", k,
partitionCounts.get(k));
+ log.info("Increased size of {} to {} partitions.", k,
newPartitions.get(k).totalCount());
}
}));
}
@@ -359,6 +402,11 @@ public class MirrorSourceConnector extends SourceConnector
{
return adminClient.describeTopics(topics).all().get().values();
}
+ static Map<String, String> configToMap(Config config) {
+ return config.entries().stream()
+ .collect(Collectors.toMap(ConfigEntry::name,
ConfigEntry::value));
+ }
+
@SuppressWarnings("deprecation")
// use deprecated alterConfigs API for broker compatibility back to 0.11.0
private void updateTopicConfigs(Map<String, Config> topicConfigs)
@@ -390,7 +438,7 @@ public class MirrorSourceConnector extends SourceConnector {
.map(x -> new TopicPartition(topic, x.partition()));
}
- private Map<String, Config> describeTopicConfigs(Set<String> topics)
+ Map<String, Config> describeTopicConfigs(Set<String> topics)
throws InterruptedException, ExecutionException {
Set<ConfigResource> resources = topics.stream()
.map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x))
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index a963391..42d7951 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -183,10 +183,16 @@ public class MirrorSourceConnectorTest {
connector.initialize(mock(ConnectorContext.class));
connector = spy(connector);
+ Config topicConfig = new Config(Arrays.asList(
+ new ConfigEntry("cleanup.policy", "compact"),
+ new ConfigEntry("segment.bytes", "100")));
+ Map<String, Config> configs = Collections.singletonMap("topic",
topicConfig);
+
List<TopicPartition> sourceTopicPartitions =
Collections.singletonList(new TopicPartition("topic", 0));
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
doReturn(Collections.emptyList()).when(connector).findTargetTopicPartitions();
- doNothing().when(connector).createTopicPartitions(any(), any(), any());
+
doReturn(configs).when(connector).describeTopicConfigs(Collections.singleton("topic"));
+ doNothing().when(connector).createNewTopics(any());
connector.refreshTopicPartitions();
// if target topic is not created, refreshTopicPartitions() will call
createTopicPartitions() again
@@ -194,13 +200,15 @@ public class MirrorSourceConnectorTest {
Map<String, Long> expectedPartitionCounts = new HashMap<>();
expectedPartitionCounts.put("source.topic", 1L);
- List<NewTopic> expectedNewTopics = Arrays.asList(new
NewTopic("source.topic", 1, (short) 0));
+ Map<String, String> configMap =
MirrorSourceConnector.configToMap(topicConfig);
+ assertEquals(2, configMap.size());
+
+ Map<String, NewTopic> expectedNewTopics = new HashMap<>();
+ expectedNewTopics.put("source.topic", new NewTopic("source.topic", 1,
(short) 0).configs(configMap));
verify(connector, times(2)).computeAndCreateTopicPartitions();
- verify(connector, times(2)).createTopicPartitions(
- eq(expectedPartitionCounts),
- eq(expectedNewTopics),
- eq(Collections.emptyMap()));
+ verify(connector, times(2)).createNewTopics(eq(expectedNewTopics));
+ verify(connector, times(0)).createNewPartitions(any());
List<TopicPartition> targetTopicPartitions =
Collections.singletonList(new TopicPartition("source.topic", 0));
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
@@ -217,11 +225,19 @@ public class MirrorSourceConnectorTest {
connector.initialize(mock(ConnectorContext.class));
connector = spy(connector);
+ Config topicConfig = new Config(Arrays.asList(
+ new ConfigEntry("cleanup.policy", "compact"),
+ new ConfigEntry("segment.bytes", "100")));
+ Map<String, Config> configs = Collections.singletonMap("source.topic",
topicConfig);
+
List<TopicPartition> sourceTopicPartitions = Collections.emptyList();
List<TopicPartition> targetTopicPartitions =
Collections.singletonList(new TopicPartition("source.topic", 0));
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
- doNothing().when(connector).createTopicPartitions(any(), any(), any());
+
doReturn(configs).when(connector).describeTopicConfigs(Collections.singleton("source.topic"));
+
doReturn(Collections.emptyMap()).when(connector).describeTopicConfigs(Collections.emptySet());
+ doNothing().when(connector).createNewTopics(any());
+ doNothing().when(connector).createNewPartitions(any());
// partitions appearing on the target cluster should not cause
reconfiguration
connector.refreshTopicPartitions();
@@ -234,6 +250,5 @@ public class MirrorSourceConnectorTest {
// when partitions are added to the source cluster, reconfiguration is
triggered
connector.refreshTopicPartitions();
verify(connector, times(1)).computeAndCreateTopicPartitions();
-
}
}