mimaison commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1135392026
##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -539,11 +596,20 @@ Map describeTopicConfigs(Set
topics)
}
Config targetConfig(Config sourceConfig) {
-List entries = sourceConfig.entries().stream()
-.filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
-.filter(x -> x.source() !=
ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
-.filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
-.collect(Collectors.toList());
+List entries;
+if (useIncrementalAlterConfigs ==
MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) {
Review Comment:
We should use `equals()` to compare String objects
##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -117,6 +122,16 @@ public MirrorSourceConnector() {
this.configPropertyFilter = configPropertyFilter;
}
+// visible for testing the deprecated setting
"use.incremental.alter.configs"
+// this constructor should be removed when the deprecated setting is
removed in Kafka 4.0
+MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy
replicationPolicy,
+ String useIncrementalAlterConfigs,
ConfigPropertyFilter configPropertyFilter, Admin targetAdmin) {
+this.sourceAndTarget = sourceAndTarget;
+this.replicationPolicy = replicationPolicy;
+this.configPropertyFilter = configPropertyFilter;
+this.useIncrementalAlterConfigs = useIncrementalAlterConfigs;
+this.targetAdminClient = targetAdmin;
+
Review Comment:
We're missing a closing bracket here!
##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##
@@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements
ConfigPropertyFilter {
public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG =
"config.properties.exclude";
public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG =
"config.properties.blacklist";
+public static final String USE_DEFAULTS_FROM = "use.defaults.from";;
Review Comment:
We can remove the trailing semi colon
##
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java:
##
@@ -78,6 +80,19 @@ public static ListTopicsResult listTopicsResult(String
topic) {
return new ListTopicsResult(future);
}
+/**
+ * Helper to create a AlterConfigsResult instance for a given Throwable.
+ * AlterConfigsResult's constructor is only accessible from within the
+ * admin package.
+ */
+public static AlterConfigsResult alterConfigsResult(ConfigResource cr,
Throwable t) {
+KafkaFutureImpl future = new KafkaFutureImpl<>();
+Map> futures = new HashMap<>();
+futures.put(cr, future);
Review Comment:
We can use singletonMap here:
```suggestion
Map> futures =
Collections.singletonMap(cr, future);
```
##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##
@@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements
ConfigPropertyFilter {
public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG =
"config.properties.exclude";
public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG =
"config.properties.blacklist";
+public static final String USE_DEFAULTS_FROM = "use.defaults.from";;
+private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's
default to use when syncing topic configurations.";
Review Comment:
Should it be `defaults` instead of `default`?
##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -64,6 +67,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
Review Comment:
This is already imported just above
##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##
@@ -40,11 +44,13 @@ public class DefaultConfigPropertyFilter implements
ConfigPropertyFilter {
+
"unclean\\.leader\\.election\\.enable, "
+
"min\\.insync\\.replicas";
private Pattern excludePattern =
MirrorUtils.compilePatternList(CONFIG_PROPERTIES_EXCLUDE_DEFAULT);
+private String useDefaultsFrom = USE_DEFAULTS_FROM_DEFAULT;
Review Comment:
Do we need this field? I think we could remove it and do
```
"source".equals(config.useDefaultsFrom())