dongjinleekr commented on a change in pull request #11431:
URL: https://github.com/apache/kafka/pull/11431#discussion_r743692427



##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
 
         // fill in reasonable defaults
         props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
-        props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets."
-                + sourceAndTarget.source() + ".internal");
-        props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"mm2-status."
-                + sourceAndTarget.source() + ".internal");
-        props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
-                + sourceAndTarget.source() + ".internal");
+
+        String separator = 
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, 
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+        if (separator.equals("-")) {
+            throw new ConfigException("You should not use a single dash as a " 
+ REPLICATION_POLICY_SEPARATOR);
+        }
+
+        props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets" + separator

Review comment:
       I have also thought over this issue again.
   
   The core of the problem seems to be, there is some gap in our configuration 
policy, and it causes the Kafka Connect internal topics may be mirrored into 
the other cluster. At first, I thought it could only happen in standalone mode, 
but it is not true - my apologies for the confusion.
   
   There are two cases this problem may happen:
   
   1. Using a custom separator in standalone mode after KIP-690 (i.e., AK 
3.1.0), what I and @OmniaGM discussed until now.
   2. Running MM2 in Kafka Connect with the default `[config, offset, 
status].storage.topic` configuration. (what @mimaison just pointed out)
   
   2 may be prevented by `topics` configuration override, and it seems like the 
users are already doing so. (Yes, it has been like this since day 1.) So, let's 
focus on 1 only.
   
   To prevent those topics from being mirrored with a custom separator, there 
are three approaches:
   
   a. Fix `ReplicationPolicy#isInternalTopic` only.
   b. Leave the users to override `topics` configuration like Kafka Connect 
mode.
   c. Make `mm2-[offsets, status, configs].{source}.internal` topics to use 
given separator.
   
   Approach a seems unavailable; to determine whether given `mm2-[offsets, 
status, configs].{source}.internal` topic is an internal one, 
`ReplicationPolicy#isInternalTopic` method should know the source cluster's 
alias, since those topics include `{source}` in their name. But actually, it 
takes a topic name as a parameter only.
   
   Approach b is good, but it may be not very clear for the users with little 
experience.
   
   Removing approach a and b, only c survives - and it is why I thought 
applying the separator to those topic names is the only way to make it work 
identically with the default separator case (which does not mirror those topics 
by default).
   
   How do you think?

##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) {
 
         // fill in reasonable defaults
         props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
-        props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets."
-                + sourceAndTarget.source() + ".internal");
-        props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, 
"mm2-status."
-                + sourceAndTarget.source() + ".internal");
-        props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
-                + sourceAndTarget.source() + ".internal");
+
+        String separator = 
originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, 
REPLICATION_POLICY_SEPARATOR_DEFAULT);
+        if (separator.equals("-")) {
+            throw new ConfigException("You should not use a single dash as a " 
+ REPLICATION_POLICY_SEPARATOR);
+        }
+
+        props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
"mm2-offsets" + separator

Review comment:
       I have also thought over this issue again.
   
   The core of the problem seems to be, there is some gap in our configuration 
policy, and it causes the Kafka Connect internal topics may be mirrored into 
the other cluster. At first, I thought it could only happen in standalone mode, 
but it is not true - my apologies for the confusion.
   
   There are two cases this problem may happen:
   
   1. Using a custom separator in standalone mode after KIP-690 (i.e., AK 
3.1.0), what I and @OmniaGM discussed until now.
   2. Running MM2 in Kafka Connect with the default `[config, offset, 
status].storage.topic` configuration. (what @mimaison just pointed out)
   
   2 may be prevented by `topics` configuration override, and it seems like the 
users are already doing so. (Yes, it has been like this since day 1.) So, let's 
focus on 1 only.
   
   To prevent those topics from being mirrored with a custom separator, there 
are three approaches:
   
   a. Fix `ReplicationPolicy#isInternalTopic` only.
   b. Leave the users to override `topics` configuration like Kafka Connect 
mode.
   c. Make `mm2-[offsets, status, configs].{source}.internal` topics to use 
given separator.
   
   Approach a seems unavailable; to determine whether given `mm2-[offsets, 
status, configs].{source}.internal` topic is an internal one, 
`ReplicationPolicy#isInternalTopic` method should know the source cluster's 
alias, since those topics include `{source}` in their name. But actually, it 
takes a topic name as a parameter only.
   
   Approach b is good, but it may be not very clear for the users with little 
experience.
   
   Removing approach a and b, only c survives - and it is why I thought 
applying the separator to those topic names is the only way to make it work 
identically with the default separator case (which does not mirror those topics 
by default).
   
   How do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to