C0urante commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1151262807
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -514,6 +539,43 @@ private void updateTopicConfigs(Map<String, Config> topicConfigs) { })); } + // visible for testing + void incrementalAlterConfigs(Map<String, Config> topicConfigs) { + Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>(); + for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) { + Collection<AlterConfigOp> ops = new ArrayList<>(); + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey()); + for (ConfigEntry config : topicConfig.getValue().entries()) { + if (config.isDefault() && !shouldReplicateSourceDefault(config.name())) { + ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE)); + } else { + ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET)); + } + } + configOps.put(configResource, ops); + } + log.trace("Syncing configs for {} topics.", configOps.size()); + targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> { Review Comment: Because we're using [AlterConfigsResult::values](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/admin/AlterConfigsResult.html#values()) instead of [AlterConfigsResult::all](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/admin/AlterConfigsResult.html#all()) here, the body after this line (starting with `if (e != null)` may be executed multiple times for a single invocation of `MirrorSourceConnector::incrementalAlterConfigs`. Since we use the topic name in our error messages, it seems acceptable to use this specific API from the results class. But we should probably avoid logging the warning messages about falling back on the deprecated API or the error-level message about failing to sync the configs for the topic more than once. Can we only log those messages once, and if there are any more failures, log the usual warning "Could not alter configurations of topic <topic>" message instead? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ########## @@ -30,21 +30,29 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter { public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude"; public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist"; + public static final String USE_DEFAULTS_FROM = "use.defaults.from"; + private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's defaults (source or target) to use " + + "when syncing topic configurations that have default values."; + private static final String USE_DEFAULTS_FROM_DEFAULT = "target"; private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic configuration properties and/or regexes " - + "that should not be replicated."; + + "that should not be replicated." + + "This setting also applies to the properties with default values" + + "if use.defaults.from is set to 'source'."; Review Comment: I think we should remove this part since this property now affects the behavior for all properties, regardless of the value of `use.defaults.from`. ```suggestion + "that should not be replicated."; ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java: ########## @@ -25,8 +25,20 @@ @InterfaceStability.Evolving public interface ConfigPropertyFilter extends Configurable, AutoCloseable { + /** + * Specifies whether to replicate the given topic configuration. + * This does not handle replication of configurations with default values. + * The configurations with default values are controlled by {@link #shouldReplicateSourceDefault(String)}. Review Comment: This part should be updated now. Maybe something like this? ```suggestion * Note that if a property has a default value on the source cluster, * {@link #shouldReplicateSourceDefault(String)} will also be called to * determine how that property should be synced. ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -538,9 +600,13 @@ Map<String, Config> describeTopicConfigs(Set<String> topics) .collect(Collectors.toMap(x -> x.getKey().name(), Entry::getValue)); } - Config targetConfig(Config sourceConfig) { + Config targetConfig(Config sourceConfig, boolean incremental) { +// If using incrementalAlterConfigs API, sync the default property with either SET or DELETE action determined by ConfigPropertyFilter::shouldReplicateSourceDefault later. +// If not using incrementalAlterConfigs API, sync the default property if ConfigPropertyFilter::shouldReplicateSourceDefault returns true. +// If ConfigPropertyFilter::shouldReplicateConfigProperty returns false, we do not sync the property at all. Review Comment: Nit: can we align this with the next line of the method body? ```suggestion // If using incrementalAlterConfigs API, sync the default property with either SET or DELETE action determined by ConfigPropertyFilter::shouldReplicateSourceDefault later. // If not using incrementalAlterConfigs API, sync the default property if ConfigPropertyFilter::shouldReplicateSourceDefault returns true. // If ConfigPropertyFilter::shouldReplicateConfigProperty returns false, we do not sync the property at all. ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -538,9 +600,13 @@ Map<String, Config> describeTopicConfigs(Set<String> topics) .collect(Collectors.toMap(x -> x.getKey().name(), Entry::getValue)); } - Config targetConfig(Config sourceConfig) { + Config targetConfig(Config sourceConfig, boolean incremental) { +// If using incrementalAlterConfigs API, sync the default property with either SET or DELETE action determined by ConfigPropertyFilter::shouldReplicateSourceDefault later. +// If not using incrementalAlterConfigs API, sync the default property if ConfigPropertyFilter::shouldReplicateSourceDefault returns true. +// If ConfigPropertyFilter::shouldReplicateConfigProperty returns false, we do not sync the property at all. List<ConfigEntry> entries = sourceConfig.entries().stream() - .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) + .filter(x -> incremental || x.isDefault() && shouldReplicateSourceDefault(x.name()) || !x.isDefault()) Review Comment: Nit: a pair of parentheses here, though not strictly necessary, could help illustrate the behavior of this boolean expression a bit more clearly: ```suggestion .filter(x -> incremental || (x.isDefault() && shouldReplicateSourceDefault(x.name())) || !x.isDefault()) ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -209,15 +229,62 @@ public void testConfigPropertyFiltering() { new DefaultReplicationPolicy(), x -> true, new DefaultConfigPropertyFilter()); ArrayList<ConfigEntry> entries = new ArrayList<>(); entries.add(new ConfigEntry("name-1", "value-1")); + entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, "")); + entries.add(new ConfigEntry("min.insync.replicas", "2")); + Config config = new Config(entries); + Config targetConfig = connector.targetConfig(config, true); + assertTrue(targetConfig.entries().stream() + .anyMatch(x -> x.name().equals("name-1")), "should replicate properties"); + assertTrue(targetConfig.entries().stream() + .anyMatch(x -> x.name().equals("name-2")), "should include default properties"); + assertFalse(targetConfig.entries().stream() + .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties"); + } + + @Test + @Deprecated + public void testConfigPropertyFilteringWithAlterConfigs() { + MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), x -> true, new DefaultConfigPropertyFilter()); + List<ConfigEntry> entries = new ArrayList<>(); + entries.add(new ConfigEntry("name-1", "value-1")); + // When "use.defaults.from" set to "target" by default, the config with default value should be excluded + entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, "")); entries.add(new ConfigEntry("min.insync.replicas", "2")); Config config = new Config(entries); - Config targetConfig = connector.targetConfig(config); + Config targetConfig = connector.targetConfig(config, false); assertTrue(targetConfig.entries().stream() .anyMatch(x -> x.name().equals("name-1")), "should replicate properties"); + assertFalse(targetConfig.entries().stream() + .anyMatch(x -> x.name().equals("name-2")), "should not replicate default properties"); assertFalse(targetConfig.entries().stream() .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties"); } + @Test + @Deprecated + public void testConfigPropertyFilteringWithAlterConfigsAndSourceDefault() { + Map<String, Object> filterConfig = Collections.singletonMap(DefaultConfigPropertyFilter.USE_DEFAULTS_FROM, "source"); + DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter(); + filter.configure(filterConfig); + + MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), x -> true, filter); + List<ConfigEntry> entries = new ArrayList<>(); + entries.add(new ConfigEntry("name-1", "value-1")); + // When "use.defaults.from" explicitly set to "source", the config with default value should be replicated + entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, "")); + entries.add(new ConfigEntry("min.insync.replicas", "2")); + Config config = new Config(entries); + Config targetConfig = connector.targetConfig(config, false); + assertTrue(targetConfig.entries().stream() + .anyMatch(x -> x.name().equals("name-1")), "should replicate properties"); + assertTrue(targetConfig.entries().stream() + .anyMatch(x -> x.name().equals("name-2")), "should not replicate default properties"); Review Comment: ```suggestion .anyMatch(x -> x.name().equals("name-2")), "should include default properties"); ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } + @Test + @Deprecated + public void testIncrementalAlterConfigsRequested() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); + doNothing().when(connector).deprecatedAlterConfigs(any()); + connector.syncTopicConfigs(); + Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config); + verify(connector).incrementalAlterConfigs(topicConfigs); + + // the next time we sync topic configurations, expect to use the deprecated API + connector.syncTopicConfigs(); + verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs); + } + + @Test + @Deprecated + public void testIncrementalAlterConfigsRequired() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = new ArrayList<>(); + ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", "value-1"); + ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, + Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""); + entries.add(entryWithNonDefaultValue); + entries.add(entryWithDefaultValue); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + + doAnswer(invocation -> { + Map<ConfigResource, Collection<AlterConfigOp>> configOps = invocation.getArgument(0); + assertNotNull(configOps); + assertEquals(1, configOps.size()); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "source." + topic); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(entryWithNonDefaultValue, AlterConfigOp.OpType.SET)); + ops.add(new AlterConfigOp(entryWithDefaultValue, AlterConfigOp.OpType.DELETE)); + + assertEquals(configOps.get(configResource), ops); Review Comment: Nit: the ordering for this `assertEquals` method is expected first, then actual (this matters because the messages generated for assertion failures can be confusing otherwise): ```suggestion assertEquals(ops, configOps.get(configResource)); ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } + @Test + @Deprecated + public void testIncrementalAlterConfigsRequested() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); Review Comment: We don't actually have to use the `MockAdminClient` class at all and can just mock the `Admin` interface for this test: ```suggestion Admin admin = mock(Admin.class); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java: ########## @@ -25,8 +25,20 @@ @InterfaceStability.Evolving public interface ConfigPropertyFilter extends Configurable, AutoCloseable { + /** + * Specifies whether to replicate the given topic configuration. + * This does not handle replication of configurations with default values. + * The configurations with default values are controlled by {@link #shouldReplicateSourceDefault(String)}. + */ boolean shouldReplicateConfigProperty(String prop); + /** + * Specifies whether to replicate the given topic configuration that has a default value Review Comment: I think we can give more detail on the exact behavior for this API: ```suggestion * Specifies how to replicate the given topic configuration property * that has a default value on the source cluster. Only invoked for properties * that {@link #shouldReplicateConfigProperty(String)} has returned * {@code true} for. * * @return {@code true} if the default value from the source topic should be synced * to the target topic, and {@code false} if the default value for the target topic * should be used instead ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } + @Test + @Deprecated + public void testIncrementalAlterConfigsRequested() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); + doNothing().when(connector).deprecatedAlterConfigs(any()); + connector.syncTopicConfigs(); + Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config); + verify(connector).incrementalAlterConfigs(topicConfigs); + + // the next time we sync topic configurations, expect to use the deprecated API + connector.syncTopicConfigs(); + verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs); + } + + @Test + @Deprecated + public void testIncrementalAlterConfigsRequired() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); Review Comment: (Same thought RE just mocking the `Admin` interface): ```suggestion Admin admin = mock(Admin.class); ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } + @Test + @Deprecated + public void testIncrementalAlterConfigsRequested() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); + doNothing().when(connector).deprecatedAlterConfigs(any()); + connector.syncTopicConfigs(); + Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config); + verify(connector).incrementalAlterConfigs(topicConfigs); + + // the next time we sync topic configurations, expect to use the deprecated API + connector.syncTopicConfigs(); + verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs); + } + + @Test + @Deprecated + public void testIncrementalAlterConfigsRequired() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = new ArrayList<>(); + ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", "value-1"); + ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, + Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""); + entries.add(entryWithNonDefaultValue); + entries.add(entryWithDefaultValue); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + + doAnswer(invocation -> { + Map<ConfigResource, Collection<AlterConfigOp>> configOps = invocation.getArgument(0); + assertNotNull(configOps); + assertEquals(1, configOps.size()); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "source." + topic); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(entryWithNonDefaultValue, AlterConfigOp.OpType.SET)); + ops.add(new AlterConfigOp(entryWithDefaultValue, AlterConfigOp.OpType.DELETE)); + + assertEquals(configOps.get(configResource), ops); + + return alterConfigsResult(configResource); + }).when(admin).incrementalAlterConfigs(any()); + + connector.syncTopicConfigs(); + Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config); + verify(connector).incrementalAlterConfigs(topicConfigs); + } + + @Test + @Deprecated + public void testIncrementalAlterConfigsRequiredButUnsupported() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); Review Comment: ```suggestion Admin admin = mock(Admin.class); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -540,10 +601,13 @@ Map<String, Config> describeTopicConfigs(Set<String> topics) Config targetConfig(Config sourceConfig) { List<ConfigEntry> entries = sourceConfig.entries().stream() - .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) - .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) - .filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) - .collect(Collectors.toList()); + .filter(x -> (!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) + || (x.isDefault() && shouldReplicateSourceDefault(x.name()))) + || (!x.isDefault() && useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG))) + .filter(x -> !x.isReadOnly() && !x.isSensitive()) + .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) + .filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) Review Comment: Looks good, thanks! ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ########## @@ -655,6 +657,135 @@ public void testRestartReplication() throws InterruptedException { assertMonotonicCheckpoints(backup, "primary.checkpoints.internal"); } + @Test + public void testSyncTopicConfigs() throws InterruptedException { + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // create topic with configuration to test: + final Map<String, String> topicConfig = new HashMap<>(); + topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000) + topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1 + + final String topic = "test-topic-with-config"; + final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS); + + primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig); + waitForTopicCreated(backup, backupTopic); + + // alter configs on the target topic + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET)); + ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + backup.kafka().incrementalAlterConfigs(configOps); + + waitForCondition(() -> { + String primaryConfig, backupConfig; + primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms"); + backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms"); + assertNotEquals(primaryConfig, backupConfig, + "`delete.retention.ms` should be different, because it's in exclude filter! "); + assertEquals("2000", backupConfig, "`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! "); + + // regression test for the config that are still supposed to be replicated + primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes"); + backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes"); + assertEquals(primaryConfig, backupConfig, + "`retention.bytes` should be the same, because it isn't in exclude filter! "); + return true; + }, 3000, "Topic configurations were not synced"); + } + + @Test + public void testReplicateSourceDefault() throws Exception { + mm2Props.put(DefaultConfigPropertyFilter.USE_DEFAULTS_FROM, "source"); + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // create topic with default configurations to test + final String topic = "test-topic-with-config"; + final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS); + + primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, new HashMap<>()); + waitForTopicCreated(backup, backupTopic); + + // alter target topic configurations + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET)); + ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + backup.kafka().incrementalAlterConfigs(configOps); + + waitForCondition(() -> { + String primaryConfig, backupConfig; + // altered configuration of the target topic should be synced with the source cluster's default + primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes"); + backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes"); + assertEquals(primaryConfig, backupConfig, + "`retention.bytes` should be the same, because the source cluster default is being used! "); + assertEquals("-1", backupConfig, + "`retention.bytes` should be synced with default value!"); + + // when using the source cluster's default, the excluded configuration should still not be changed + primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms"); + backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms"); + assertNotEquals(primaryConfig, backupConfig, + "`delete.retention.ms` should be different, because it's in exclude filter! "); + assertEquals("2000", backupConfig, "`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! "); + return true; + }, 3000, "Topic configurations were not synced"); + } + + @Test + public void testReplicateTargetDefault() throws Exception { + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // create topic with configuration to test: + final Map<String, String> topicConfig = new HashMap<>(); + topicConfig.put("retention.bytes", "1000"); + + final String topic = "test-topic-with-config"; + final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS); + + primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig); + waitForTopicCreated(backup, backupTopic); + + waitForCondition(() -> { + String primaryConfig, backupConfig; + primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes"); + backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes"); + assertEquals(primaryConfig, backupConfig, + "`retention.bytes` should be the same"); + assertEquals("1000", backupConfig, + "`retention.bytes` should be synced with default value!"); + + // delete the previously altered configuration of the source topic + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "1000"), AlterConfigOp.OpType.DELETE)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + primary.kafka().incrementalAlterConfigs(configOps); Review Comment: Nit: any reason this has to take place inside the call to `waitForCondition`? Would it make more sense to move this part to start at line 778? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } + @Test + @Deprecated + public void testIncrementalAlterConfigsRequested() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); + doNothing().when(connector).deprecatedAlterConfigs(any()); + connector.syncTopicConfigs(); + Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config); + verify(connector).incrementalAlterConfigs(topicConfigs); + + // the next time we sync topic configurations, expect to use the deprecated API + connector.syncTopicConfigs(); + verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs); + } + + @Test + @Deprecated + public void testIncrementalAlterConfigsRequired() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = new ArrayList<>(); + ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", "value-1"); + ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, + Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""); + entries.add(entryWithNonDefaultValue); + entries.add(entryWithDefaultValue); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + + doAnswer(invocation -> { + Map<ConfigResource, Collection<AlterConfigOp>> configOps = invocation.getArgument(0); + assertNotNull(configOps); + assertEquals(1, configOps.size()); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "source." + topic); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(entryWithNonDefaultValue, AlterConfigOp.OpType.SET)); + ops.add(new AlterConfigOp(entryWithDefaultValue, AlterConfigOp.OpType.DELETE)); + + assertEquals(configOps.get(configResource), ops); + + return alterConfigsResult(configResource); + }).when(admin).incrementalAlterConfigs(any()); + + connector.syncTopicConfigs(); + Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config); + verify(connector).incrementalAlterConfigs(topicConfigs); + } + + @Test + @Deprecated + public void testIncrementalAlterConfigsRequiredButUnsupported() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + ConnectorContext connectorContext = mock(ConnectorContext.class); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + connector.initialize(connectorContext); + final String topic = "testtopic"; + List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); + doNothing().when(connector).deprecatedAlterConfigs(any()); Review Comment: We no longer need this part: ```suggestion ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java: ########## @@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception { verify(connector).createNewTopics(any(), any()); } + @Test + @Deprecated + public void testIncrementalAlterConfigsRequested() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); + doNothing().when(connector).deprecatedAlterConfigs(any()); + connector.syncTopicConfigs(); + Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config); + verify(connector).incrementalAlterConfigs(topicConfigs); + + // the next time we sync topic configurations, expect to use the deprecated API + connector.syncTopicConfigs(); + verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs); + } + + @Test + @Deprecated + public void testIncrementalAlterConfigsRequired() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + final String topic = "testtopic"; + List<ConfigEntry> entries = new ArrayList<>(); + ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", "value-1"); + ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, + Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""); + entries.add(entryWithNonDefaultValue); + entries.add(entryWithDefaultValue); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + + doAnswer(invocation -> { + Map<ConfigResource, Collection<AlterConfigOp>> configOps = invocation.getArgument(0); + assertNotNull(configOps); + assertEquals(1, configOps.size()); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "source." + topic); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(entryWithNonDefaultValue, AlterConfigOp.OpType.SET)); + ops.add(new AlterConfigOp(entryWithDefaultValue, AlterConfigOp.OpType.DELETE)); + + assertEquals(configOps.get(configResource), ops); + + return alterConfigsResult(configResource); + }).when(admin).incrementalAlterConfigs(any()); + + connector.syncTopicConfigs(); + Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config); + verify(connector).incrementalAlterConfigs(topicConfigs); + } + + @Test + @Deprecated + public void testIncrementalAlterConfigsRequiredButUnsupported() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props); + + MockAdminClient admin = spy(new MockAdminClient()); + ConnectorContext connectorContext = mock(ConnectorContext.class); + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin)); + connector.initialize(connectorContext); + final String topic = "testtopic"; + List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any()); + doNothing().when(connector).deprecatedAlterConfigs(any()); + + connector.syncTopicConfigs(); + verify(connectorContext).raiseError(isA(ConnectException.class)); + } + + + @Test + @Deprecated + public void testIncrementalAlterConfigsNeverUsed() throws Exception { + Map<String, String> props = makeProps(); + props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS); + MirrorSourceConfig connectorConfigs = new MirrorSourceConfig(props); + + MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new DefaultReplicationPolicy(), connectorConfigs, new DefaultConfigPropertyFilter(), null)); + final String topic = "testtopic"; + List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1")); + Config config = new Config(entries); + doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any()); + doNothing().when(connector).deprecatedAlterConfigs(any()); + connector.syncTopicConfigs(); + Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config); + verify(connector).deprecatedAlterConfigs(topicConfigs); Review Comment: Can we also verify that we never tried to use the incremental API? ```suggestion verify(connector).deprecatedAlterConfigs(topicConfigs); verify(connector, never()).incrementalAlterConfigs(any()); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java: ########## @@ -30,6 +30,9 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter { public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude"; public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist"; + public static final String USE_DEFAULTS_FROM = "use.defaults.from"; + private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's defaults to use when syncing topic configurations."; + private static final String USE_DEFAULTS_FROM_DEFAULT = "target"; private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic configuration properties and/or regexes " + "that should not be replicated."; Review Comment: No worries! I can see how the GitHub UI could be misleading here. Looks good now 👍 ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ########## @@ -655,6 +657,135 @@ public void testRestartReplication() throws InterruptedException { assertMonotonicCheckpoints(backup, "primary.checkpoints.internal"); } + @Test + public void testSyncTopicConfigs() throws InterruptedException { + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // create topic with configuration to test: + final Map<String, String> topicConfig = new HashMap<>(); + topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000) + topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1 + + final String topic = "test-topic-with-config"; + final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS); + + primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig); + waitForTopicCreated(backup, backupTopic); + + // alter configs on the target topic + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET)); + ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops); + // alter configs on target cluster + backup.kafka().incrementalAlterConfigs(configOps); + + waitForCondition(() -> { + String primaryConfig, backupConfig; + primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms"); + backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms"); + assertNotEquals(primaryConfig, backupConfig, + "`delete.retention.ms` should be different, because it's in exclude filter! "); + assertEquals("2000", backupConfig, "`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! "); + + // regression test for the config that are still supposed to be replicated + primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes"); + backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes"); + assertEquals(primaryConfig, backupConfig, + "`retention.bytes` should be the same, because it isn't in exclude filter! "); + return true; + }, 3000, "Topic configurations were not synced"); Review Comment: We should use a much higher timeout here (30 or 60 seconds). Our CI machines are notoriously slow and this may lead to flakiness as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org