mimaison commented on a change in pull request #11431: URL: https://github.com/apache/kafka/pull/11431#discussion_r742038737
########## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java ########## @@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) { // fill in reasonable defaults props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2"); - props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets." - + sourceAndTarget.source() + ".internal"); - props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "mm2-status." - + sourceAndTarget.source() + ".internal"); - props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs." - + sourceAndTarget.source() + ".internal"); + + String separator = originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, REPLICATION_POLICY_SEPARATOR_DEFAULT); + if (separator.equals("-")) { + throw new ConfigException("You should not use a single dash as a " + REPLICATION_POLICY_SEPARATOR); + } + + props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets" + separator Review comment: My bad, I missed the fact that this line is about the Kafka Connect's internal topics and not MM2's. I think it's fine to keep the code as is. ########## File path: connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java ########## @@ -50,8 +50,7 @@ public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class; public static final String REPLICATION_POLICY_SEPARATOR = "replication.policy.separator"; private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention."; - public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = - DefaultReplicationPolicy.SEPARATOR_DEFAULT; + public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = "."; Review comment: Yeah but I expect the value for this config to come from a class implementing the interface so in this case `DefaultReplicationPolicy`. ########## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java ########## @@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) { // fill in reasonable defaults props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2"); - props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets." - + sourceAndTarget.source() + ".internal"); - props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "mm2-status." - + sourceAndTarget.source() + ".internal"); - props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs." - + sourceAndTarget.source() + ".internal"); + + String separator = originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, REPLICATION_POLICY_SEPARATOR_DEFAULT); + if (separator.equals("-")) { + throw new ConfigException("You should not use a single dash as a " + REPLICATION_POLICY_SEPARATOR); + } + + props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets" + separator Review comment: My bad, I missed the fact that this line is about the Kafka Connect's internal topics and not MM2's. If I understand correctly this behaviour is not new and it's not related to changes made in KIP-690. I agree it's unfortunate that if users change the default separator, these Connect internal topics get mirrored in dedicated mode. In Connect mode, this could also happen. For example, if `offset.storage.topic` is not set to a value that is caught by `isInternalTopic()`. If we wanted to use the separator for Connect's topic, I'd prefer doing it via a small KIP. As it's likely been like this since day 1, this seems this may not be a very common use case and I'm not sure we necessarily need to do something. If a user in dedicated mode wants a different separator, they will need to also override the Connect topic names to ensure they match the separator. ########## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java ########## @@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) { // fill in reasonable defaults props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2"); - props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets." - + sourceAndTarget.source() + ".internal"); - props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "mm2-status." - + sourceAndTarget.source() + ".internal"); - props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs." - + sourceAndTarget.source() + ".internal"); + + String separator = originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, REPLICATION_POLICY_SEPARATOR_DEFAULT); + if (separator.equals("-")) { + throw new ConfigException("You should not use a single dash as a " + REPLICATION_POLICY_SEPARATOR); + } + + props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets" + separator Review comment: My bad, I missed the fact that this line is about the Kafka Connect's internal topics and not MM2's. I think it's fine to keep the code as is. ########## File path: connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java ########## @@ -50,8 +50,7 @@ public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class; public static final String REPLICATION_POLICY_SEPARATOR = "replication.policy.separator"; private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention."; - public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = - DefaultReplicationPolicy.SEPARATOR_DEFAULT; + public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT = "."; Review comment: Yeah but I expect the value for this config to come from a class implementing the interface so in this case `DefaultReplicationPolicy`. ########## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java ########## @@ -183,12 +187,18 @@ public MirrorClientConfig clientConfig(String cluster) { // fill in reasonable defaults props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2"); - props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets." - + sourceAndTarget.source() + ".internal"); - props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "mm2-status." - + sourceAndTarget.source() + ".internal"); - props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs." - + sourceAndTarget.source() + ".internal"); + + String separator = originalsStrings().getOrDefault(REPLICATION_POLICY_SEPARATOR, REPLICATION_POLICY_SEPARATOR_DEFAULT); + if (separator.equals("-")) { + throw new ConfigException("You should not use a single dash as a " + REPLICATION_POLICY_SEPARATOR); + } + + props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets" + separator Review comment: My bad, I missed the fact that this line is about the Kafka Connect's internal topics and not MM2's. If I understand correctly this behaviour is not new and it's not related to changes made in KIP-690. I agree it's unfortunate that if users change the default separator, these Connect internal topics get mirrored in dedicated mode. In Connect mode, this could also happen. For example, if `offset.storage.topic` is not set to a value that is caught by `isInternalTopic()`. If we wanted to use the separator for Connect's topic, I'd prefer doing it via a small KIP. As it's likely been like this since day 1, this seems this may not be a very common use case and I'm not sure we necessarily need to do something. If a user in dedicated mode wants a different separator, they will need to also override the Connect topic names to ensure they match the separator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org