mimaison commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1135392026
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -539,11 +596,20 @@ Map<String, Config> describeTopicConfigs(Set<String> topics) } Config targetConfig(Config sourceConfig) { - List<ConfigEntry> entries = sourceConfig.entries().stream() - .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) - .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) - .filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) - .collect(Collectors.toList()); + List<ConfigEntry> entries; + if (useIncrementalAlterConfigs == MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) { Review Comment: We should use `equals()` to compare String objects ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -117,6 +122,16 @@ public MirrorSourceConnector() { this.configPropertyFilter = configPropertyFilter; } + // visible for testing the deprecated setting "use.incremental.alter.configs" + // this constructor should be removed when the deprecated setting is removed in Kafka 4.0 + MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy, + String useIncrementalAlterConfigs, ConfigPropertyFilter configPropertyFilter, Admin targetAdmin) { + this.sourceAndTarget = sourceAndTarget; + this.replicationPolicy = replicationPolicy; + this.configPropertyFilter = configPropertyFilter; + this.useIncrementalAlterConfigs = useIncrementalAlterConfigs; + this.targetAdminClient = targetAdmin; + Review Comment: We're missing a closing bracket here! ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ########## @@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter { public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude"; public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist"; + public static final String USE_DEFAULTS_FROM = "use.defaults.from";; Review Comment: We can remove the trailing semi colon ########## clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java: ########## @@ -78,6 +80,19 @@ public static ListTopicsResult listTopicsResult(String topic) { return new ListTopicsResult(future); } + /** + * Helper to create a AlterConfigsResult instance for a given Throwable. + * AlterConfigsResult's constructor is only accessible from within the + * admin package. + */ + public static AlterConfigsResult alterConfigsResult(ConfigResource cr, Throwable t) { + KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); + Map<ConfigResource, KafkaFuture<Void>> futures = new HashMap<>(); + futures.put(cr, future); Review Comment: We can use singletonMap here: ```suggestion Map<ConfigResource, KafkaFuture<Void>> futures = Collections.singletonMap(cr, future); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ########## @@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter { public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude"; public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist"; + public static final String USE_DEFAULTS_FROM = "use.defaults.from";; + private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's default to use when syncing topic configurations."; Review Comment: Should it be `defaults` instead of `default`? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -64,6 +67,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; Review Comment: This is already imported just above ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ########## @@ -40,11 +44,13 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter { + "unclean\\.leader\\.election\\.enable, " + "min\\.insync\\.replicas"; private Pattern excludePattern = MirrorUtils.compilePatternList(CONFIG_PROPERTIES_EXCLUDE_DEFAULT); + private String useDefaultsFrom = USE_DEFAULTS_FROM_DEFAULT; Review Comment: Do we need this field? I think we could remove it and do ``` "source".equals(config.useDefaultsFrom()) ``` in `shouldReplicateSourceDefault()` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -73,6 +73,18 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX; private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = "Frequency of topic config sync."; public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 * 60; + @Deprecated + public static final String USE_INCREMENTAL_ALTER_CONFIG = "use.incremental.alter.configs"; + private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = "Deprecated. Which API to use for syncing topic configs. " + + "The valid values are \"requested\", \"required\" and \"never\". " + + "By default, set to \"requested\", which means the IncrementalAlterConfigs is being used for syncing topic configurations " + + "and if any request receives an error from an incompatible broker, it will fallback to using the deprecated AlterConfigs API. " + + "If explicitly set to \"required\", the IncrementalAlterConfigs is used without the fallback logic. " + + "If explicitly set to \"never\", the AlterConfig is always used." + + "This setting will be removed in Kafka 4.0, therefore users should ensure that target broker is at least 2.3.0"; Review Comment: I think we should mention that the behavior of `required` will be used from Kafka 4.0 ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -73,6 +73,18 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX; private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = "Frequency of topic config sync."; public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 * 60; + @Deprecated + public static final String USE_INCREMENTAL_ALTER_CONFIG = "use.incremental.alter.configs"; + private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = "Deprecated. Which API to use for syncing topic configs. " + + "The valid values are \"requested\", \"required\" and \"never\". " + + "By default, set to \"requested\", which means the IncrementalAlterConfigs is being used for syncing topic configurations " + Review Comment: `IncrementalAlterConfigs` -> `IncrementalAlterConfigs API` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -73,6 +73,18 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX; private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = "Frequency of topic config sync."; public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 * 60; + @Deprecated + public static final String USE_INCREMENTAL_ALTER_CONFIG = "use.incremental.alter.configs"; + private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = "Deprecated. Which API to use for syncing topic configs. " + + "The valid values are \"requested\", \"required\" and \"never\". " + + "By default, set to \"requested\", which means the IncrementalAlterConfigs is being used for syncing topic configurations " + + "and if any request receives an error from an incompatible broker, it will fallback to using the deprecated AlterConfigs API. " + + "If explicitly set to \"required\", the IncrementalAlterConfigs is used without the fallback logic. " + + "If explicitly set to \"never\", the AlterConfig is always used." + + "This setting will be removed in Kafka 4.0, therefore users should ensure that target broker is at least 2.3.0"; + public static final String USE_INCREMENTAL_ALTER_CONFIG_DEFAULT = "requested"; Review Comment: Should we rename this field to `REQUESTED_INCREMENTAL_ALTER_CONFIG`? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -209,11 +227,34 @@ public void testConfigPropertyFiltering() { new DefaultReplicationPolicy(), x -> true, new DefaultConfigPropertyFilter()); ArrayList<ConfigEntry> entries = new ArrayList<>(); entries.add(new ConfigEntry("name-1", "value-1")); + entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, "")); + entries.add(new ConfigEntry("min.insync.replicas", "2")); + Config config = new Config(entries); + Config targetConfig = connector.targetConfig(config); + assertTrue(targetConfig.entries().stream() + .anyMatch(x -> x.name().equals("name-1")), "should replicate properties"); + assertTrue(targetConfig.entries().stream() + .anyMatch(x -> x.name().equals("name-2")), "should include default properties"); + assertFalse(targetConfig.entries().stream() + .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties"); + } + + @Test + @Deprecated + public void testConfigPropertyFilteringWithAlterConfigs() { + MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG, new DefaultConfigPropertyFilter(), null); + ArrayList<ConfigEntry> entries = new ArrayList<>(); Review Comment: Left side can be `List<ConfigEntry>` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -261,6 +302,54 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } + @Test + public void testIncrementalAlterConfigsRequested() throws Exception { + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = new ArrayList<>(); + entries.add(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); + doNothing().when(connector).alterConfigs(any()); + connector.syncTopicConfigs(); + verify(connector).syncTopicConfigs(); + verify(connector).incrementalAlterConfigs(any()); + verify(connector, times(1)).alterConfigs(any()); + } + + @Test + public void testIncrementalAlterConfigsRequired() throws Exception { Review Comment: Can we add a test that causes `incrementalAlterConfig()` to emit a `OpType.DELETE` op? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -261,6 +302,54 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } + @Test + public void testIncrementalAlterConfigsRequested() throws Exception { + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = new ArrayList<>(); + entries.add(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); + doNothing().when(connector).alterConfigs(any()); Review Comment: Can we verify the arguments passed to `incrementalAlterConfigs()` and `alterConfigs()` in these tests? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ########## @@ -140,6 +147,143 @@ public void testReplication() throws Exception { "New topic was not replicated to backup cluster."); } + @Test + public void testSyncTopicConfigs() throws Exception { + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + waitUntilMirrorMakerIsRunning(primary, Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS); + + // create topic with configuration to test: + final Map<String, String> topicConfig = new HashMap<>(); + topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000) + topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1 + + final String topic = "test-topic-with-config"; + final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS); + + primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig); + waitForTopicCreated(backup, backupTopic); + + // alter configs on the target topic + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET)); + ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>(1); + configOps.put(configResource, ops); + // alter configs on target cluster + backup.kafka().incrementalAlterConfigs(configOps); + + // sleep few seconds to have MM2 sync topic configuration + Thread.sleep(TimeUnit.SECONDS.toMillis(3)); Review Comment: Can we avoid using `sleep()` directly and instead use `waitForCondition()`? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java: ########## @@ -27,6 +28,8 @@ public interface ConfigPropertyFilter extends Configurable, AutoCloseable { boolean shouldReplicateConfigProperty(String prop); + boolean shouldReplicateSourceDefault(ConfigEntry.ConfigSource source); Review Comment: We need a default method here otherwise all existing implementations of this interface will break. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -261,6 +302,54 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } + @Test + public void testIncrementalAlterConfigsRequested() throws Exception { + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = new ArrayList<>(); + entries.add(new ConfigEntry("name-1", "value-1")); Review Comment: Can be simplified with: ```suggestion List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -162,6 +174,15 @@ Duration syncTopicConfigsInterval() { } } + String useIncrementalAlterConfigs() { + String prop = getString(USE_INCREMENTAL_ALTER_CONFIG); + if (prop.equals(NEVER_USE_INCREMENTAL_ALTER_CONFIG) || prop.equals(REQUIRE_INCREMENTAL_ALTER_CONFIG)) { Review Comment: Do we need this if/else block? The config is defined with a default value so it should already return `USE_INCREMENTAL_ALTER_CONFIG_DEFAULT` is not set explicitly. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ########## @@ -140,6 +147,143 @@ public void testReplication() throws Exception { "New topic was not replicated to backup cluster."); } + @Test + public void testSyncTopicConfigs() throws Exception { + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + waitUntilMirrorMakerIsRunning(primary, Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS); Review Comment: Do we need to start 2 instances of MirrorMaker for this test? A single instance would simplify this test a bit. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ########## @@ -140,6 +147,143 @@ public void testReplication() throws Exception { "New topic was not replicated to backup cluster."); } + @Test + public void testSyncTopicConfigs() throws Exception { + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + waitUntilMirrorMakerIsRunning(primary, Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS); + + // create topic with configuration to test: + final Map<String, String> topicConfig = new HashMap<>(); + topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000) + topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1 + + final String topic = "test-topic-with-config"; + final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS); + + primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig); + waitForTopicCreated(backup, backupTopic); + + // alter configs on the target topic + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET)); + ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>(1); + configOps.put(configResource, ops); Review Comment: We can use: ```suggestion Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -539,11 +596,20 @@ Map<String, Config> describeTopicConfigs(Set<String> topics) } Config targetConfig(Config sourceConfig) { - List<ConfigEntry> entries = sourceConfig.entries().stream() - .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) - .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) - .filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) - .collect(Collectors.toList()); + List<ConfigEntry> entries; + if (useIncrementalAlterConfigs == MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) { Review Comment: Both branches are very similar, can we use merge them and only conditionally do the `isDefault()` check? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -264,6 +285,13 @@ Duration consumerPollTimeout() { SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT, ConfigDef.Importance.LOW, SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC) + .define( + USE_INCREMENTAL_ALTER_CONFIG, + ConfigDef.Type.STRING, + USE_INCREMENTAL_ALTER_CONFIG_DEFAULT, + ConfigDef.Importance.LOW, + USE_INCREMENTAL_ALTER_CONFIG_DOC + ) Review Comment: nit: All other configs put the closing parenthesis on the last line. Let's use the same formatting. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java: ########## @@ -27,6 +28,8 @@ public interface ConfigPropertyFilter extends Configurable, AutoCloseable { boolean shouldReplicateConfigProperty(String prop); + boolean shouldReplicateSourceDefault(ConfigEntry.ConfigSource source); Review Comment: In the KIP the argument was the config name as a String, why is it a ConfigEntry.ConfigSource object here? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java: ########## @@ -27,6 +28,8 @@ public interface ConfigPropertyFilter extends Configurable, AutoCloseable { boolean shouldReplicateConfigProperty(String prop); + boolean shouldReplicateSourceDefault(ConfigEntry.ConfigSource source); Review Comment: Can we also add some javadoc? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -72,8 +76,22 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.mockito.Mockito; Review Comment: Unused import ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ########## @@ -140,6 +147,143 @@ public void testReplication() throws Exception { "New topic was not replicated to backup cluster."); } + @Test Review Comment: Should this test be in `MirrorConnectorsIntegrationBaseTest` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org