This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 417630d KAFKA-13397: MirrorMaker should not mirror topics ending with
`.internal` (#11431)
417630d is described below
commit 417630d9be91bc62b908d8337fe7fc937eca823c
Author: Lee Dongjin <[email protected]>
AuthorDate: Thu Nov 18 02:14:02 2021 +0900
KAFKA-13397: MirrorMaker should not mirror topics ending with `.internal`
(#11431)
When running in dedicated mode, Connect runtimes are configured to use the
`.internal` suffix for their topics.
Reviewers: Mickael Maison <[email protected]>, Omnia G H Ibrahim
<[email protected]>
---
.../kafka/connect/mirror/ReplicationPolicy.java | 5 +--
.../connect/mirror/ReplicationPolicyTest.java | 42 ++++++++++++++++++++++
.../connect/mirror/MirrorMakerConfigTest.java | 2 ++
3 files changed, 47 insertions(+), 2 deletions(-)
diff --git
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
index 0a9130b..d8d5593 100644
---
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
+++
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -84,7 +84,8 @@ public interface ReplicationPolicy {
/** Internal topics are never replicated. */
default boolean isInternalTopic(String topic) {
- boolean isKafkaInternalTopic = topic.startsWith("__") ||
topic.startsWith(".") || topic.endsWith("-internal");
- return isMM2InternalTopic(topic) || isKafkaInternalTopic;
+ boolean isKafkaInternalTopic = topic.startsWith("__") ||
topic.startsWith(".");
+ boolean isDefaultConnectTopic = topic.endsWith("-internal") ||
topic.endsWith(".internal");
+ return isMM2InternalTopic(topic) || isKafkaInternalTopic ||
isDefaultConnectTopic;
}
}
diff --git
a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
new file mode 100644
index 0000000..4810f0e
--- /dev/null
+++
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class ReplicationPolicyTest {
+ private static final DefaultReplicationPolicy DEFAULT_REPLICATION_POLICY =
new DefaultReplicationPolicy();
+
+ @Test
+ public void testInternalTopic() {
+ // starts with '__'
+
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("__consumer_offsets"));
+ // starts with '.'
+ assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic(".hiddentopic"));
+
+ // ends with '.internal': default
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG in standalone mode.
+
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets.CLUSTER.internal"));
+ // ends with '-internal'
+
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets-CLUSTER-internal"));
+ // non-internal topic.
+
assertFalse(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets_CLUSTER_internal"));
+ }
+}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index 4787ecd..41bcacb 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -236,6 +236,7 @@ public class MirrorMakerConfigTest {
SourceAndTarget b = new SourceAndTarget("a", "b");
Map<String, String> aProps = mirrorConfig.workerConfig(a);
assertEquals("123", aProps.get("offset.storage.replication.factor"));
+ assertEquals("__", aProps.get("replication.policy.separator"));
Map<String, String> bProps = mirrorConfig.workerConfig(b);
assertEquals("456", bProps.get("status.storage.replication.factor"));
assertEquals("client-one", bProps.get("producer.client.id"),
@@ -254,6 +255,7 @@ public class MirrorMakerConfigTest {
"security properties should be transformed in worker config");
assertEquals("secret2", bProps.get("producer.ssl.key.password"),
"security properties should be transformed in worker producer
config");
+ assertEquals("__", bProps.get("replication.policy.separator"));
}
@Test