C0urante commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1149857584
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -514,6 +543,41 @@ private void updateTopicConfigs(Map<String, Config> topicConfigs) { })); } + // visible for testing + void incrementalAlterConfigs(Map<String, Config> topicConfigs) { + Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>(); + for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) { + Collection<AlterConfigOp> ops = new ArrayList<>(); + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey()); + for (ConfigEntry config : topicConfig.getValue().entries()) { + if (config.isDefault() && !shouldReplicateSourceDefault(config.name())) { + ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE)); + } else { + ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET)); + } + } + configOps.put(configResource, ops); + } + log.trace("Syncing configs for {} topics.", configOps.size()); + targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> { + if (e != null) { + if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIG) + && e instanceof UnsupportedVersionException) { + //Fallback logic + log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. Therefore using deprecated AlterConfigs API for syncing topic configurations", sourceAndTarget.target(), e); + useIncrementalAlterConfigs = MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG; + } else if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIG) + && e instanceof UnsupportedVersionException) { + log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e); + context.raiseError(new ConnectException("use.incremental.alter.configs was set to \"required\", but the target cluster '" + + sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", e)); Review Comment: Oh, one more thing: the indentation is off here, which is causing Checkstyle to fail the build: ```suggestion log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e); context.raiseError(new ConnectException("use.incremental.alter.configs was set to \"required\", but the target cluster '" + sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", e)); ``` Can you try to make sure that the project builds before pushing? `./gradlew :connect:mirror:build` should do the trick, or if you want to skip integration tests (since those can take a while) you can do `./gradlew :connect:mirror:build :connect:mirror:unitTest -x test`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org