showuon commented on code in PR #17027:
URL: https://github.com/apache/kafka/pull/17027#discussion_r2641755412


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -644,34 +604,24 @@ void incrementalAlterConfigs(Map<String, Config> 
topicConfigs) throws ExecutionE
             configOps.put(configResource, ops);
         }
         log.trace("Syncing configs for {} topics.", configOps.size());
-        AtomicReference<Boolean> encounteredError = new 
AtomicReference<>(false);
-        adminCall(
-                () -> {
-                    
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
-                        if (e != null) {
-                            if 
(config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS)
-                                    && e instanceof 
UnsupportedVersionException && !encounteredError.get()) {
-                                //Fallback logic
-                                log.warn("The target cluster {} is not 
compatible with IncrementalAlterConfigs API. "
-                                                + "Therefore using deprecated 
AlterConfigs API for syncing configs for topic {}",
-                                        sourceAndTarget.target(), k.name(), e);
-                                encounteredError.set(true);
-                                useIncrementalAlterConfigs = false;
-                            } else if 
(config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS)
-                                    && e instanceof 
UnsupportedVersionException && !encounteredError.get()) {
-                                log.error("Failed to sync configs for topic {} 
on cluster {} with IncrementalAlterConfigs API", k.name(), 
sourceAndTarget.target(), e);
-                                encounteredError.set(true);
-                                context.raiseError(new 
ConnectException("use.incremental.alter.configs was set to \"required\", but 
the target cluster '"
-                                        + sourceAndTarget.target() + "' is not 
compatible with IncrementalAlterConfigs API", e));
-                            } else {
-                                log.warn("Could not alter configuration of 
topic {}.", k.name(), e);
-                            }
-                        }
-                    }));
-                    return null;
-                },
-                () -> String.format("incremental alter topic configs %s on %s 
cluster", topicConfigs, config.targetClusterAlias())
-        );
+        adminCall(() -> {
+            targetAdminClient.incrementalAlterConfigs(configOps).values()
+                .forEach((k, v) -> v.whenComplete((x, e) -> {
+                    if (e instanceof UnsupportedVersionException) {
+                        log.error("Failed to sync configs for topic {} on 
cluster {} with " +
+                                "IncrementalAlterConfigs API", k.name(), 
sourceAndTarget.target(), e);
+                        context.raiseError(new ConnectException("the target 
cluster '"
+                                + sourceAndTarget.target() + "' is not 
compatible with " +
+                                "IncrementalAlterConfigs " +
+                                "API", e));
+                    } else {
+                        log.warn("Could not alter configuration of topic {}.", 
k.name(), e);
+                    }
+                }));

Review Comment:
   Unexpected WARN log will be printed even if the alterConfig completes 
without error. Opened 
[KAFKA-20017](https://issues.apache.org/jira/browse/KAFKA-20017) to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to