This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 417630d  KAFKA-13397: MirrorMaker should not mirror topics ending with 
`.internal` (#11431)
417630d is described below

commit 417630d9be91bc62b908d8337fe7fc937eca823c
Author: Lee Dongjin <[email protected]>
AuthorDate: Thu Nov 18 02:14:02 2021 +0900

    KAFKA-13397: MirrorMaker should not mirror topics ending with `.internal` 
(#11431)
    
    When running in dedicated mode, Connect runtimes are configured to use the 
`.internal` suffix for their topics.
    
    Reviewers: Mickael Maison <[email protected]>, Omnia G H Ibrahim 
<[email protected]>
---
 .../kafka/connect/mirror/ReplicationPolicy.java    |  5 +--
 .../connect/mirror/ReplicationPolicyTest.java      | 42 ++++++++++++++++++++++
 .../connect/mirror/MirrorMakerConfigTest.java      |  2 ++
 3 files changed, 47 insertions(+), 2 deletions(-)

diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
index 0a9130b..d8d5593 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -84,7 +84,8 @@ public interface ReplicationPolicy {
 
     /** Internal topics are never replicated. */
     default boolean isInternalTopic(String topic) {
-        boolean isKafkaInternalTopic = topic.startsWith("__") || 
topic.startsWith(".") ||  topic.endsWith("-internal");
-        return isMM2InternalTopic(topic) || isKafkaInternalTopic;
+        boolean isKafkaInternalTopic = topic.startsWith("__") || 
topic.startsWith(".");
+        boolean isDefaultConnectTopic =  topic.endsWith("-internal") ||  
topic.endsWith(".internal");
+        return isMM2InternalTopic(topic) || isKafkaInternalTopic || 
isDefaultConnectTopic;
     }
 }
diff --git 
a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
 
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
new file mode 100644
index 0000000..4810f0e
--- /dev/null
+++ 
b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/ReplicationPolicyTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class ReplicationPolicyTest {
+    private static final DefaultReplicationPolicy DEFAULT_REPLICATION_POLICY = 
new DefaultReplicationPolicy();
+
+    @Test
+    public void testInternalTopic() {
+        // starts with '__'
+        
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("__consumer_offsets"));
+        // starts with '.'
+        assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic(".hiddentopic"));
+
+        // ends with '.internal': default 
DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG in standalone mode.
+        
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets.CLUSTER.internal"));
+        // ends with '-internal'
+        
assertTrue(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets-CLUSTER-internal"));
+        // non-internal topic.
+        
assertFalse(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets_CLUSTER_internal"));
+    }
+}
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index 4787ecd..41bcacb 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -236,6 +236,7 @@ public class MirrorMakerConfigTest {
         SourceAndTarget b = new SourceAndTarget("a", "b");
         Map<String, String> aProps = mirrorConfig.workerConfig(a);
         assertEquals("123", aProps.get("offset.storage.replication.factor"));
+        assertEquals("__", aProps.get("replication.policy.separator"));
         Map<String, String> bProps = mirrorConfig.workerConfig(b);
         assertEquals("456", bProps.get("status.storage.replication.factor"));
         assertEquals("client-one", bProps.get("producer.client.id"),
@@ -254,6 +255,7 @@ public class MirrorMakerConfigTest {
             "security properties should be transformed in worker config");
         assertEquals("secret2", bProps.get("producer.ssl.key.password"),
             "security properties should be transformed in worker producer 
config");
+        assertEquals("__", bProps.get("replication.policy.separator"));
     }
 
     @Test

Reply via email to