[GitHub] [kafka] mimaison commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-03-17 Thread via GitHub


mimaison commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1140384184


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##
@@ -162,6 +174,15 @@ Duration syncTopicConfigsInterval() {
 }
 }
 
+String useIncrementalAlterConfigs() {
+String prop = getString(USE_INCREMENTAL_ALTER_CONFIG);
+if (prop.equals(NEVER_USE_INCREMENTAL_ALTER_CONFIG) || 
prop.equals(REQUIRE_INCREMENTAL_ALTER_CONFIG)) {

Review Comment:
   We can set the valid values when defining the config. For example see 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L354-L355



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-03-14 Thread via GitHub


mimaison commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1135392026


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -539,11 +596,20 @@ Map describeTopicConfigs(Set 
topics)
 }
 
 Config targetConfig(Config sourceConfig) {
-List entries = sourceConfig.entries().stream()
-.filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
-.filter(x -> x.source() != 
ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
-.filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
-.collect(Collectors.toList());
+List entries;
+if (useIncrementalAlterConfigs == 
MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) {

Review Comment:
   We should use `equals()` to compare String objects



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -117,6 +122,16 @@ public MirrorSourceConnector() {
 this.configPropertyFilter = configPropertyFilter;
 }
 
+// visible for testing the deprecated setting 
"use.incremental.alter.configs"
+// this constructor should be removed when the deprecated setting is 
removed in Kafka 4.0
+MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy 
replicationPolicy,
+  String useIncrementalAlterConfigs, 
ConfigPropertyFilter configPropertyFilter, Admin targetAdmin) {
+this.sourceAndTarget = sourceAndTarget;
+this.replicationPolicy = replicationPolicy;
+this.configPropertyFilter = configPropertyFilter;
+this.useIncrementalAlterConfigs = useIncrementalAlterConfigs;
+this.targetAdminClient = targetAdmin;
+

Review Comment:
   We're missing a closing bracket here!



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##
@@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
 
 public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = 
"config.properties.exclude";
 public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = 
"config.properties.blacklist";
+public static final String USE_DEFAULTS_FROM = "use.defaults.from";;

Review Comment:
   We can remove the trailing semi colon



##
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java:
##
@@ -78,6 +80,19 @@ public static ListTopicsResult listTopicsResult(String 
topic) {
 return new ListTopicsResult(future);
 }
 
+/**
+ * Helper to create a AlterConfigsResult instance for a given Throwable.
+ * AlterConfigsResult's constructor is only accessible from within the
+ * admin package.
+ */
+public static AlterConfigsResult alterConfigsResult(ConfigResource cr, 
Throwable t) {
+KafkaFutureImpl future = new KafkaFutureImpl<>();
+Map> futures = new HashMap<>();
+futures.put(cr, future);

Review Comment:
   We can use singletonMap here:
   ```suggestion
   Map> futures = 
Collections.singletonMap(cr, future);
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##
@@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
 
 public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = 
"config.properties.exclude";
 public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = 
"config.properties.blacklist";
+public static final String USE_DEFAULTS_FROM = "use.defaults.from";;
+private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's 
default to use when syncing topic configurations.";

Review Comment:
   Should it be `defaults` instead of `default`?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -64,6 +67,7 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;

Review Comment:
   This is already imported just above



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##
@@ -40,11 +44,13 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
+ 
"unclean\\.leader\\.election\\.enable, "
+ 
"min\\.insync\\.replicas";
 private Pattern excludePattern = 
MirrorUtils.compilePatternList(CONFIG_PROPERTIES_EXCLUDE_DEFAULT);
+private String useDefaultsFrom = USE_DEFAULTS_FROM_DEFAULT;

Review Comment:
   Do we need this field? I think we could remove it and do
   ```
   "source".equals(config.useDefaultsFrom())