C0urante commented on code in PR #13458:
URL: https://github.com/apache/kafka/pull/13458#discussion_r1152299630


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java:
##########
@@ -223,6 +231,15 @@ ForwardingAdmin forwardingAdmin(Map<String, Object> 
config) {
         }
     }
 
+    void addClientId(Map<String, Object> props, String role) {
+        String clientId = entityLabel() + (role == null ? "" : "|" + role);
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);

Review Comment:
   Doesn't this overwrite user-supplied client IDs?
   
   We should probably do something similar to 
[ConnectUtils::clientIdBase](https://github.com/apache/kafka/blob/379b6978a04c171bcc64331a095f1c97eb4e1830/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java#L193-L211),
 where we use the value specified by the user for the `client.id` property (if 
one is found) as part of the complete client ID that we end up constructing.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java:
##########
@@ -48,8 +48,8 @@ public MirrorHeartbeatConnector() {
     @Override
     public void start(Map<String, String> props) {
         config = new MirrorHeartbeatConfig(props);
-        targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
-        scheduler = new Scheduler(MirrorHeartbeatConnector.class, 
config.adminTimeout());
+        targetAdminClient = 
config.forwardingAdmin(config.targetAdminConfig("target-admin"));

Review Comment:
   Isn't this a bit vague? Don't we want something like 
"heartbeats-target-admin" instead?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -100,7 +100,7 @@ public void start(Map<String, String> props) {
         replicationPolicy = config.replicationPolicy();
         partitionStates = new HashMap<>();
         offsetSyncsTopic = config.offsetSyncsTopic();
-        consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig());
+        consumer = 
MirrorUtils.newConsumer(config.sourceConsumerConfig("data-consumer"));

Review Comment:
   Nit: "replication" is more specific than "data" (every consumer could be 
thought of as one that reads "data")



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -291,6 +291,7 @@ public DistributedHerder(DistributedConfig config,
 
         String clientIdConfig = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
         String clientId = clientIdConfig.length() <= 0 ? "connect-" + 
CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
+        String escapedClientId = clientId.replace("%", "%%");

Review Comment:
   This isn't mentioned in the description for the PR or the Jira ticket; why 
is this changed?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -264,26 +264,25 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
         String kafkaClusterId = distributedConfig.kafkaClusterId();
-        String clientIdBase = ConnectUtils.clientIdBase(distributedConfig);
         // Create the admin client to be shared by all backing stores for this 
herder
         Map<String, Object> adminProps = new 
HashMap<>(distributedConfig.originals());
         ConnectUtils.addMetricsContextProperties(adminProps, 
distributedConfig, kafkaClusterId);
         SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase);
+        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(sharedAdmin, () -> workerId);

Review Comment:
   I'm not sure about this change. It overrides any user-specified values for 
the `client.id` property and loses the group ID. We should definitely try to 
retain the former, and the latter would be nice if not too much trouble.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -71,9 +72,9 @@ public void start(Map<String, String> props) {
         String connectorName = config.connectorName();
         sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), 
config.targetClusterAlias());
         groupFilter = config.groupFilter();
-        sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
-        targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
-        scheduler = new Scheduler(MirrorCheckpointConnector.class, 
config.adminTimeout());
+        sourceAdminClient = 
config.forwardingAdmin(config.sourceAdminConfig("source-admin"));
+        targetAdminClient = 
config.forwardingAdmin(config.targetAdminConfig("target-admin"));
+        scheduler = new Scheduler(config.entityLabel(), config.adminTimeout());

Review Comment:
   Doesn't this drop information? If the user isn't running MM2 in dedicated 
mode, the name of the connector may be different than its class. We might want 
to preserve the class name in these messages.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -71,9 +72,9 @@ public void start(Map<String, String> props) {
         String connectorName = config.connectorName();
         sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), 
config.targetClusterAlias());
         groupFilter = config.groupFilter();
-        sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
-        targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
-        scheduler = new Scheduler(MirrorCheckpointConnector.class, 
config.adminTimeout());
+        sourceAdminClient = 
config.forwardingAdmin(config.sourceAdminConfig("source-admin"));

Review Comment:
   Maybe "checkpoint-source-admin"?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -136,10 +137,10 @@ public void start(Map<String, String> props) {
         configPropertyFilter = config.configPropertyFilter();
         replicationPolicy = config.replicationPolicy();
         replicationFactor = config.replicationFactor();
-        sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
-        targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
+        sourceAdminClient = 
config.forwardingAdmin(config.sourceAdminConfig("source-admin"));

Review Comment:
   More detail? Maybe "replication-source-admin"?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java:
##########
@@ -199,6 +199,7 @@ public Map<String, String> workerConfig(SourceAndTarget 
sourceAndTarget) {
         props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG));
 
         // fill in reasonable defaults
+        props.putIfAbsent(CommonClientConfigs.CLIENT_ID_CONFIG, 
sourceAndTarget.toString());

Review Comment:
   What does this change? I haven't looked too closely but I haven't seen a 
place where this will have any effect yet.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -96,12 +96,12 @@ public void start(Map<String, String> props) {
         interval = config.emitCheckpointsInterval();
         pollTimeout = config.consumerPollTimeout();
         offsetSyncStore = new OffsetSyncStore(config);
-        sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
-        targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
+        sourceAdminClient = 
config.forwardingAdmin(config.sourceAdminConfig("source-admin"));

Review Comment:
   "checkpoint-source-admin"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to