showuon commented on code in PR #17027:
URL: https://github.com/apache/kafka/pull/17027#discussion_r2641755412
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -644,34 +604,24 @@ void incrementalAlterConfigs(Map<String, Config>
topicConfigs) throws ExecutionE
configOps.put(configResource, ops);
}
log.trace("Syncing configs for {} topics.", configOps.size());
- AtomicReference<Boolean> encounteredError = new
AtomicReference<>(false);
- adminCall(
- () -> {
-
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) ->
v.whenComplete((x, e) -> {
- if (e != null) {
- if
(config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS)
- && e instanceof
UnsupportedVersionException && !encounteredError.get()) {
- //Fallback logic
- log.warn("The target cluster {} is not
compatible with IncrementalAlterConfigs API. "
- + "Therefore using deprecated
AlterConfigs API for syncing configs for topic {}",
- sourceAndTarget.target(), k.name(), e);
- encounteredError.set(true);
- useIncrementalAlterConfigs = false;
- } else if
(config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS)
- && e instanceof
UnsupportedVersionException && !encounteredError.get()) {
- log.error("Failed to sync configs for topic {}
on cluster {} with IncrementalAlterConfigs API", k.name(),
sourceAndTarget.target(), e);
- encounteredError.set(true);
- context.raiseError(new
ConnectException("use.incremental.alter.configs was set to \"required\", but
the target cluster '"
- + sourceAndTarget.target() + "' is not
compatible with IncrementalAlterConfigs API", e));
- } else {
- log.warn("Could not alter configuration of
topic {}.", k.name(), e);
- }
- }
- }));
- return null;
- },
- () -> String.format("incremental alter topic configs %s on %s
cluster", topicConfigs, config.targetClusterAlias())
- );
+ adminCall(() -> {
+ targetAdminClient.incrementalAlterConfigs(configOps).values()
+ .forEach((k, v) -> v.whenComplete((x, e) -> {
+ if (e instanceof UnsupportedVersionException) {
+ log.error("Failed to sync configs for topic {} on
cluster {} with " +
+ "IncrementalAlterConfigs API", k.name(),
sourceAndTarget.target(), e);
+ context.raiseError(new ConnectException("the target
cluster '"
+ + sourceAndTarget.target() + "' is not
compatible with " +
+ "IncrementalAlterConfigs " +
+ "API", e));
+ } else {
+ log.warn("Could not alter configuration of topic {}.",
k.name(), e);
+ }
+ }));
Review Comment:
Unexpected WARN log will be printed even if the alterConfig completes
without error (i.e. `e == null`). Opened
[KAFKA-20017](https://issues.apache.org/jira/browse/KAFKA-20017) to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]