C0urante commented on code in PR #13458: URL: https://github.com/apache/kafka/pull/13458#discussion_r1152299630
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java: ########## @@ -223,6 +231,15 @@ ForwardingAdmin forwardingAdmin(Map<String, Object> config) { } } + void addClientId(Map<String, Object> props, String role) { + String clientId = entityLabel() + (role == null ? "" : "|" + role); + props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); Review Comment: Doesn't this overwrite user-supplied client IDs? We should probably do something similar to [ConnectUtils::clientIdBase](https://github.com/apache/kafka/blob/379b6978a04c171bcc64331a095f1c97eb4e1830/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java#L193-L211), where we use the value specified by the user for the `client.id` property (if one is found) as part of the complete client ID that we end up constructing. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java: ########## @@ -48,8 +48,8 @@ public MirrorHeartbeatConnector() { @Override public void start(Map<String, String> props) { config = new MirrorHeartbeatConfig(props); - targetAdminClient = config.forwardingAdmin(config.targetAdminConfig()); - scheduler = new Scheduler(MirrorHeartbeatConnector.class, config.adminTimeout()); + targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("target-admin")); Review Comment: Isn't this a bit vague? Don't we want something like "heartbeats-target-admin" instead? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -100,7 +100,7 @@ public void start(Map<String, String> props) { replicationPolicy = config.replicationPolicy(); partitionStates = new HashMap<>(); offsetSyncsTopic = config.offsetSyncsTopic(); - consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig()); + consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("data-consumer")); Review Comment: Nit: "replication" is more specific than "data" (every consumer could be thought of as one that reads "data") ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -291,6 +291,7 @@ public DistributedHerder(DistributedConfig config, String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig; + String escapedClientId = clientId.replace("%", "%%"); Review Comment: This isn't mentioned in the description for the PR or the Jira ticket; why is this changed? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java: ########## @@ -264,26 +264,25 @@ private void addHerder(SourceAndTarget sourceAndTarget) { plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig distributedConfig = new DistributedConfig(workerProps); String kafkaClusterId = distributedConfig.kafkaClusterId(); - String clientIdBase = ConnectUtils.clientIdBase(distributedConfig); // Create the admin client to be shared by all backing stores for this herder Map<String, Object> adminProps = new HashMap<>(distributedConfig.originals()); ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId); SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps); - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase); + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin, () -> workerId); Review Comment: I'm not sure about this change. It overrides any user-specified values for the `client.id` property and loses the group ID. We should definitely try to retain the former, and the latter would be nice if not too much trouble. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -71,9 +72,9 @@ public void start(Map<String, String> props) { String connectorName = config.connectorName(); sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias()); groupFilter = config.groupFilter(); - sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig()); - targetAdminClient = config.forwardingAdmin(config.targetAdminConfig()); - scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout()); + sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("source-admin")); + targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("target-admin")); + scheduler = new Scheduler(config.entityLabel(), config.adminTimeout()); Review Comment: Doesn't this drop information? If the user isn't running MM2 in dedicated mode, the name of the connector may be different than its class. We might want to preserve the class name in these messages. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -71,9 +72,9 @@ public void start(Map<String, String> props) { String connectorName = config.connectorName(); sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias()); groupFilter = config.groupFilter(); - sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig()); - targetAdminClient = config.forwardingAdmin(config.targetAdminConfig()); - scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout()); + sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("source-admin")); Review Comment: Maybe "checkpoint-source-admin"? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -136,10 +137,10 @@ public void start(Map<String, String> props) { configPropertyFilter = config.configPropertyFilter(); replicationPolicy = config.replicationPolicy(); replicationFactor = config.replicationFactor(); - sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig()); - targetAdminClient = config.forwardingAdmin(config.targetAdminConfig()); + sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("source-admin")); Review Comment: More detail? Maybe "replication-source-admin"? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java: ########## @@ -199,6 +199,7 @@ public Map<String, String> workerConfig(SourceAndTarget sourceAndTarget) { props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG)); // fill in reasonable defaults + props.putIfAbsent(CommonClientConfigs.CLIENT_ID_CONFIG, sourceAndTarget.toString()); Review Comment: What does this change? I haven't looked too closely but I haven't seen a place where this will have any effect yet. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -96,12 +96,12 @@ public void start(Map<String, String> props) { interval = config.emitCheckpointsInterval(); pollTimeout = config.consumerPollTimeout(); offsetSyncStore = new OffsetSyncStore(config); - sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig()); - targetAdminClient = config.forwardingAdmin(config.targetAdminConfig()); + sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("source-admin")); Review Comment: "checkpoint-source-admin"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org