C0urante commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1149857584


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -514,6 +543,41 @@ private void updateTopicConfigs(Map<String, Config> 
topicConfigs) {
         }));
     }
 
+    // visible for testing
+    void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new 
HashMap<>();
+        for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+            for (ConfigEntry config : topicConfig.getValue().entries()) {
+                if (config.isDefault() && 
!shouldReplicateSourceDefault(config.name())) {
+                    ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.DELETE));
+                } else {
+                    ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.SET));
+                }
+            }
+            configOps.put(configResource, ops);
+        }
+        log.trace("Syncing configs for {} topics.", configOps.size());
+        
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
+            if (e != null) {
+                if 
(useIncrementalAlterConfigs.equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIG)
+                        && e instanceof UnsupportedVersionException) {
+                    //Fallback logic
+                    log.warn("The target cluster {} is not compatible with 
IncrementalAlterConfigs API. Therefore using deprecated AlterConfigs API for 
syncing topic configurations", sourceAndTarget.target(), e);
+                    useIncrementalAlterConfigs = 
MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG;
+                } else if 
(useIncrementalAlterConfigs.equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIG)
+                        && e instanceof UnsupportedVersionException) {
+                        log.error("Failed to sync configs for topic {} on 
cluster {} with IncrementalAlterConfigs API", k.name(), 
sourceAndTarget.target(), e);
+                        context.raiseError(new 
ConnectException("use.incremental.alter.configs was set to \"required\", but 
the target cluster '"
+                                + sourceAndTarget.target() + "' is not 
compatible with IncrementalAlterConfigs API", e));

Review Comment:
   Oh, one more thing: the indentation is off here, which is causing Checkstyle 
to fail the build:
   ```suggestion
                       log.error("Failed to sync configs for topic {} on 
cluster {} with IncrementalAlterConfigs API", k.name(), 
sourceAndTarget.target(), e);
                       context.raiseError(new 
ConnectException("use.incremental.alter.configs was set to \"required\", but 
the target cluster '"
                               + sourceAndTarget.target() + "' is not 
compatible with IncrementalAlterConfigs API", e));
   ```
   
   Can you try to make sure that the project builds before pushing? `./gradlew 
:connect:mirror:build` should do the trick, or if you want to skip integration 
tests (since those can take a while) you can do `./gradlew 
:connect:mirror:build :connect:mirror:unitTest -x test`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to