C0urante commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1151262807


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -514,6 +539,43 @@ private void updateTopicConfigs(Map<String, Config> 
topicConfigs) {
         }));
     }
 
+    // visible for testing
+    void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new 
HashMap<>();
+        for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+            for (ConfigEntry config : topicConfig.getValue().entries()) {
+                if (config.isDefault() && 
!shouldReplicateSourceDefault(config.name())) {
+                    ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.DELETE));
+                } else {
+                    ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.SET));
+                }
+            }
+            configOps.put(configResource, ops);
+        }
+        log.trace("Syncing configs for {} topics.", configOps.size());
+        
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {

Review Comment:
   Because we're using 
[AlterConfigsResult::values](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/admin/AlterConfigsResult.html#values())
 instead of 
[AlterConfigsResult::all](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/admin/AlterConfigsResult.html#all())
 here, the body after this line (starting with `if (e != null)` may be executed 
multiple times for a single invocation of 
`MirrorSourceConnector::incrementalAlterConfigs`.
   
   Since we use the topic name in our error messages, it seems acceptable to 
use this specific API from the results class. But we should probably avoid 
logging the warning messages about falling back on the deprecated API or the 
error-level message about failing to sync the configs for the topic more than 
once.
   
   Can we only log those messages once, and if there are any more failures, log 
the usual warning "Could not alter configurations of topic &lt;topic&gt;" 
message instead?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##########
@@ -30,21 +30,29 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
     
     public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = 
"config.properties.exclude";
     public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = 
"config.properties.blacklist";
+    public static final String USE_DEFAULTS_FROM = "use.defaults.from";
+    private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's 
defaults (source or target) to use "
+                                                        + "when syncing topic 
configurations that have default values.";
+    private static final String USE_DEFAULTS_FROM_DEFAULT = "target";
 
     private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic 
configuration properties and/or regexes "
-                                                                + "that should 
not be replicated.";
+                                                                + "that should 
not be replicated."
+                                                                + "This 
setting also applies to the properties with default values"
+                                                                + "if 
use.defaults.from is set to 'source'.";

Review Comment:
   I think we should remove this part since this property now affects the 
behavior for all properties, regardless of the value of `use.defaults.from`.
   ```suggestion
                                                                   + "that 
should not be replicated.";
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java:
##########
@@ -25,8 +25,20 @@
 @InterfaceStability.Evolving
 public interface ConfigPropertyFilter extends Configurable, AutoCloseable {
 
+    /**
+     * Specifies whether to replicate the given topic configuration.
+     * This does not handle replication of configurations with default values.
+     * The configurations with default values are controlled by {@link 
#shouldReplicateSourceDefault(String)}.

Review Comment:
   This part should be updated now. Maybe something like this?
   
   ```suggestion
        * Note that if a property has a default value on the source cluster,
        * {@link #shouldReplicateSourceDefault(String)} will also be called to
        * determine how that property should be synced.
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -538,9 +600,13 @@ Map<String, Config> describeTopicConfigs(Set<String> 
topics)
             .collect(Collectors.toMap(x -> x.getKey().name(), 
Entry::getValue));
     }
 
-    Config targetConfig(Config sourceConfig) {
+    Config targetConfig(Config sourceConfig, boolean incremental) {
+// If using incrementalAlterConfigs API, sync the default property with either 
SET or DELETE action determined by 
ConfigPropertyFilter::shouldReplicateSourceDefault later.
+// If not using incrementalAlterConfigs API, sync the default property if 
ConfigPropertyFilter::shouldReplicateSourceDefault returns true.
+// If ConfigPropertyFilter::shouldReplicateConfigProperty returns false, we do 
not sync the property at all.

Review Comment:
   Nit: can we align this with the next line of the method body?
   ```suggestion
           // If using incrementalAlterConfigs API, sync the default property 
with either SET or DELETE action determined by 
ConfigPropertyFilter::shouldReplicateSourceDefault later.
           // If not using incrementalAlterConfigs API, sync the default 
property if ConfigPropertyFilter::shouldReplicateSourceDefault returns true.
           // If ConfigPropertyFilter::shouldReplicateConfigProperty returns 
false, we do not sync the property at all.
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -538,9 +600,13 @@ Map<String, Config> describeTopicConfigs(Set<String> 
topics)
             .collect(Collectors.toMap(x -> x.getKey().name(), 
Entry::getValue));
     }
 
-    Config targetConfig(Config sourceConfig) {
+    Config targetConfig(Config sourceConfig, boolean incremental) {
+// If using incrementalAlterConfigs API, sync the default property with either 
SET or DELETE action determined by 
ConfigPropertyFilter::shouldReplicateSourceDefault later.
+// If not using incrementalAlterConfigs API, sync the default property if 
ConfigPropertyFilter::shouldReplicateSourceDefault returns true.
+// If ConfigPropertyFilter::shouldReplicateConfigProperty returns false, we do 
not sync the property at all.
         List<ConfigEntry> entries = sourceConfig.entries().stream()
-            .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
+            .filter(x -> incremental || x.isDefault() && 
shouldReplicateSourceDefault(x.name()) || !x.isDefault())

Review Comment:
   Nit: a pair of parentheses here, though not strictly necessary, could help 
illustrate the behavior of this boolean expression a bit more clearly:
   ```suggestion
               .filter(x -> incremental || (x.isDefault() && 
shouldReplicateSourceDefault(x.name())) || !x.isDefault())
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -209,15 +229,62 @@ public void testConfigPropertyFiltering() {
             new DefaultReplicationPolicy(), x -> true, new 
DefaultConfigPropertyFilter());
         ArrayList<ConfigEntry> entries = new ArrayList<>();
         entries.add(new ConfigEntry("name-1", "value-1"));
+        entries.add(new ConfigEntry("name-2", "value-2", 
ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), 
ConfigEntry.ConfigType.STRING, ""));
+        entries.add(new ConfigEntry("min.insync.replicas", "2"));
+        Config config = new Config(entries);
+        Config targetConfig = connector.targetConfig(config, true);
+        assertTrue(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("name-1")), "should replicate 
properties");
+        assertTrue(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("name-2")), "should include default 
properties");
+        assertFalse(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("min.insync.replicas")), "should 
not replicate excluded properties");
+    }
+
+    @Test
+    @Deprecated
+    public void testConfigPropertyFilteringWithAlterConfigs() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), x -> true, new 
DefaultConfigPropertyFilter());
+        List<ConfigEntry> entries = new ArrayList<>();
+        entries.add(new ConfigEntry("name-1", "value-1"));
+        // When "use.defaults.from" set to "target" by default, the config 
with default value should be excluded
+        entries.add(new ConfigEntry("name-2", "value-2", 
ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), 
ConfigEntry.ConfigType.STRING, ""));
         entries.add(new ConfigEntry("min.insync.replicas", "2"));
         Config config = new Config(entries);
-        Config targetConfig = connector.targetConfig(config);
+        Config targetConfig = connector.targetConfig(config, false);
         assertTrue(targetConfig.entries().stream()
             .anyMatch(x -> x.name().equals("name-1")), "should replicate 
properties");
+        assertFalse(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("name-2")), "should not replicate 
default properties");
         assertFalse(targetConfig.entries().stream()
             .anyMatch(x -> x.name().equals("min.insync.replicas")), "should 
not replicate excluded properties");
     }
 
+    @Test
+    @Deprecated
+    public void testConfigPropertyFilteringWithAlterConfigsAndSourceDefault() {
+        Map<String, Object> filterConfig = 
Collections.singletonMap(DefaultConfigPropertyFilter.USE_DEFAULTS_FROM, 
"source");
+        DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
+        filter.configure(filterConfig);
+
+        MirrorSourceConnector connector = new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(),  x -> true, filter);
+        List<ConfigEntry> entries = new ArrayList<>();
+        entries.add(new ConfigEntry("name-1", "value-1"));
+        // When "use.defaults.from" explicitly set to "source", the config 
with default value should be replicated
+        entries.add(new ConfigEntry("name-2", "value-2", 
ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), 
ConfigEntry.ConfigType.STRING, ""));
+        entries.add(new ConfigEntry("min.insync.replicas", "2"));
+        Config config = new Config(entries);
+        Config targetConfig = connector.targetConfig(config, false);
+        assertTrue(targetConfig.entries().stream()
+                .anyMatch(x -> x.name().equals("name-1")), "should replicate 
properties");
+        assertTrue(targetConfig.entries().stream()
+                .anyMatch(x -> x.name().equals("name-2")), "should not 
replicate default properties");

Review Comment:
   ```suggestion
                   .anyMatch(x -> x.name().equals("name-2")), "should include 
default properties");
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception {
         verify(connector).createNewTopics(any(), any());
     }
 
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequested() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
+        doNothing().when(connector).deprecatedAlterConfigs(any());
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." 
+ topic, config);
+        verify(connector).incrementalAlterConfigs(topicConfigs);
+
+        // the next time we sync topic configurations, expect to use the 
deprecated API
+        connector.syncTopicConfigs();
+        verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs);
+    }
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequired() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = new ArrayList<>();
+        ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", 
"value-1");
+        ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", 
"value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false,
+                Collections.emptyList(), ConfigEntry.ConfigType.STRING, "");
+        entries.add(entryWithNonDefaultValue);
+        entries.add(entryWithDefaultValue);
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+
+        doAnswer(invocation -> {
+            Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
invocation.getArgument(0);
+            assertNotNull(configOps);
+            assertEquals(1, configOps.size());
+
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, "source." + topic);
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ops.add(new AlterConfigOp(entryWithNonDefaultValue, 
AlterConfigOp.OpType.SET));
+            ops.add(new AlterConfigOp(entryWithDefaultValue, 
AlterConfigOp.OpType.DELETE));
+
+            assertEquals(configOps.get(configResource), ops);

Review Comment:
   Nit: the ordering for this `assertEquals` method is expected first, then 
actual (this matters because the messages generated for assertion failures can 
be confusing otherwise):
   ```suggestion
               assertEquals(ops, configOps.get(configResource));
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception {
         verify(connector).createNewTopics(any(), any());
     }
 
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequested() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());

Review Comment:
   We don't actually have to use the `MockAdminClient` class at all and can 
just mock the `Admin` interface for this test:
   ```suggestion
           Admin admin = mock(Admin.class);
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java:
##########
@@ -25,8 +25,20 @@
 @InterfaceStability.Evolving
 public interface ConfigPropertyFilter extends Configurable, AutoCloseable {
 
+    /**
+     * Specifies whether to replicate the given topic configuration.
+     * This does not handle replication of configurations with default values.
+     * The configurations with default values are controlled by {@link 
#shouldReplicateSourceDefault(String)}.
+     */
     boolean shouldReplicateConfigProperty(String prop);
 
+    /**
+     * Specifies whether to replicate the given topic configuration that has a 
default value

Review Comment:
   I think we can give more detail on the exact behavior for this API:
   
   ```suggestion
        * Specifies how to replicate the given topic configuration property
        * that has a default value on the source cluster. Only invoked for 
properties
        * that {@link #shouldReplicateConfigProperty(String)} has returned
        * {@code true} for.
        *
        * @return {@code true} if the default value from the source topic 
should be synced
        * to the target topic, and {@code false} if the default value for the 
target topic
        * should be used instead
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception {
         verify(connector).createNewTopics(any(), any());
     }
 
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequested() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
+        doNothing().when(connector).deprecatedAlterConfigs(any());
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." 
+ topic, config);
+        verify(connector).incrementalAlterConfigs(topicConfigs);
+
+        // the next time we sync topic configurations, expect to use the 
deprecated API
+        connector.syncTopicConfigs();
+        verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs);
+    }
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequired() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());

Review Comment:
   (Same thought RE just mocking the `Admin` interface):
   ```suggestion
           Admin admin = mock(Admin.class);
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception {
         verify(connector).createNewTopics(any(), any());
     }
 
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequested() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
+        doNothing().when(connector).deprecatedAlterConfigs(any());
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." 
+ topic, config);
+        verify(connector).incrementalAlterConfigs(topicConfigs);
+
+        // the next time we sync topic configurations, expect to use the 
deprecated API
+        connector.syncTopicConfigs();
+        verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs);
+    }
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequired() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = new ArrayList<>();
+        ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", 
"value-1");
+        ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", 
"value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false,
+                Collections.emptyList(), ConfigEntry.ConfigType.STRING, "");
+        entries.add(entryWithNonDefaultValue);
+        entries.add(entryWithDefaultValue);
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+
+        doAnswer(invocation -> {
+            Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
invocation.getArgument(0);
+            assertNotNull(configOps);
+            assertEquals(1, configOps.size());
+
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, "source." + topic);
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ops.add(new AlterConfigOp(entryWithNonDefaultValue, 
AlterConfigOp.OpType.SET));
+            ops.add(new AlterConfigOp(entryWithDefaultValue, 
AlterConfigOp.OpType.DELETE));
+
+            assertEquals(configOps.get(configResource), ops);
+
+            return alterConfigsResult(configResource);
+        }).when(admin).incrementalAlterConfigs(any());
+
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." 
+ topic, config);
+        verify(connector).incrementalAlterConfigs(topicConfigs);
+    }
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequiredButUnsupported() throws 
Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());

Review Comment:
   ```suggestion
           Admin admin = mock(Admin.class);
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -540,10 +601,13 @@ Map<String, Config> describeTopicConfigs(Set<String> 
topics)
 
     Config targetConfig(Config sourceConfig) {
         List<ConfigEntry> entries = sourceConfig.entries().stream()
-            .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
-            .filter(x -> x.source() != 
ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
-            .filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
-            .collect(Collectors.toList());
+                    .filter(x -> 
(!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG)
+                            || (x.isDefault() && 
shouldReplicateSourceDefault(x.name())))
+                            || (!x.isDefault() && 
useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG)))
+                    .filter(x -> !x.isReadOnly() && !x.isSensitive())
+                    .filter(x -> x.source() != 
ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
+                    .filter(x -> 
shouldReplicateTopicConfigurationProperty(x.name()))

Review Comment:
   Looks good, thanks!



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -655,6 +657,135 @@ public void testRestartReplication() throws 
InterruptedException {
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
     }
 
+    @Test
+    public void testSyncTopicConfigs() throws InterruptedException {
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // create topic with configuration to test:
+        final Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put("delete.retention.ms", "1000"); // should be excluded 
(default value is 86400000)
+        topicConfig.put("retention.bytes", "1000"); // should be included, 
default value is -1
+
+        final String topic = "test-topic-with-config";
+        final String backupTopic = remoteTopicName(topic, 
PRIMARY_CLUSTER_ALIAS);
+
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+        waitForTopicCreated(backup, backupTopic);
+
+        // alter configs on the target topic
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", 
"2000"), AlterConfigOp.OpType.SET));
+        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        backup.kafka().incrementalAlterConfigs(configOps);
+
+        waitForCondition(() -> {
+            String primaryConfig, backupConfig;
+            primaryConfig = getTopicConfig(primary.kafka(), topic, 
"delete.retention.ms");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, 
"delete.retention.ms");
+            assertNotEquals(primaryConfig, backupConfig,
+                    "`delete.retention.ms` should be different, because it's 
in exclude filter! ");
+            assertEquals("2000", backupConfig, "`delete.retention.ms` should 
be 2000, because it's explicitly defined on the target topic! ");
+
+            // regression test for the config that are still supposed to be 
replicated
+            primaryConfig = getTopicConfig(primary.kafka(), topic, 
"retention.bytes");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, 
"retention.bytes");
+            assertEquals(primaryConfig, backupConfig,
+                    "`retention.bytes` should be the same, because it isn't in 
exclude filter! ");
+            return true;
+        }, 3000, "Topic configurations were not synced");
+    }
+
+    @Test
+    public void testReplicateSourceDefault() throws Exception {
+        mm2Props.put(DefaultConfigPropertyFilter.USE_DEFAULTS_FROM, "source");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // create topic with default configurations to test
+        final String topic = "test-topic-with-config";
+        final String backupTopic = remoteTopicName(topic, 
PRIMARY_CLUSTER_ALIAS);
+
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, new HashMap<>());
+        waitForTopicCreated(backup, backupTopic);
+
+        // alter target topic configurations
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", 
"2000"), AlterConfigOp.OpType.SET));
+        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        backup.kafka().incrementalAlterConfigs(configOps);
+
+        waitForCondition(() -> {
+            String primaryConfig, backupConfig;
+            // altered configuration of the target topic should be synced with 
the source cluster's default
+            primaryConfig = getTopicConfig(primary.kafka(), topic, 
"retention.bytes");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, 
"retention.bytes");
+            assertEquals(primaryConfig, backupConfig,
+                    "`retention.bytes` should be the same, because the source 
cluster default is being used! ");
+            assertEquals("-1", backupConfig,
+                    "`retention.bytes` should be synced with default value!");
+
+            // when using the source cluster's default, the excluded 
configuration should still not be changed
+            primaryConfig = getTopicConfig(primary.kafka(), topic, 
"delete.retention.ms");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, 
"delete.retention.ms");
+            assertNotEquals(primaryConfig, backupConfig,
+                    "`delete.retention.ms` should be different, because it's 
in exclude filter! ");
+            assertEquals("2000", backupConfig, "`delete.retention.ms` should 
be 2000, because it's explicitly defined on the target topic! ");
+            return true;
+        }, 3000, "Topic configurations were not synced");
+    }
+
+    @Test
+    public void testReplicateTargetDefault() throws Exception {
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // create topic with configuration to test:
+        final Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put("retention.bytes", "1000");
+
+        final String topic = "test-topic-with-config";
+        final String backupTopic = remoteTopicName(topic, 
PRIMARY_CLUSTER_ALIAS);
+
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+        waitForTopicCreated(backup, backupTopic);
+
+        waitForCondition(() -> {
+            String primaryConfig, backupConfig;
+            primaryConfig = getTopicConfig(primary.kafka(), topic, 
"retention.bytes");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, 
"retention.bytes");
+            assertEquals(primaryConfig, backupConfig,
+                    "`retention.bytes` should be the same");
+            assertEquals("1000", backupConfig,
+                    "`retention.bytes` should be synced with default value!");
+
+            // delete the previously altered configuration of the source topic
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topic);
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", 
"1000"), AlterConfigOp.OpType.DELETE));
+            Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+            primary.kafka().incrementalAlterConfigs(configOps);

Review Comment:
   Nit: any reason this has to take place inside the call to 
`waitForCondition`? Would it make more sense to move this part to start at line 
778?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception {
         verify(connector).createNewTopics(any(), any());
     }
 
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequested() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
+        doNothing().when(connector).deprecatedAlterConfigs(any());
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." 
+ topic, config);
+        verify(connector).incrementalAlterConfigs(topicConfigs);
+
+        // the next time we sync topic configurations, expect to use the 
deprecated API
+        connector.syncTopicConfigs();
+        verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs);
+    }
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequired() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = new ArrayList<>();
+        ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", 
"value-1");
+        ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", 
"value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false,
+                Collections.emptyList(), ConfigEntry.ConfigType.STRING, "");
+        entries.add(entryWithNonDefaultValue);
+        entries.add(entryWithDefaultValue);
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+
+        doAnswer(invocation -> {
+            Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
invocation.getArgument(0);
+            assertNotNull(configOps);
+            assertEquals(1, configOps.size());
+
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, "source." + topic);
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ops.add(new AlterConfigOp(entryWithNonDefaultValue, 
AlterConfigOp.OpType.SET));
+            ops.add(new AlterConfigOp(entryWithDefaultValue, 
AlterConfigOp.OpType.DELETE));
+
+            assertEquals(configOps.get(configResource), ops);
+
+            return alterConfigsResult(configResource);
+        }).when(admin).incrementalAlterConfigs(any());
+
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." 
+ topic, config);
+        verify(connector).incrementalAlterConfigs(topicConfigs);
+    }
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequiredButUnsupported() throws 
Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        ConnectorContext connectorContext = mock(ConnectorContext.class);
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        connector.initialize(connectorContext);
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
+        doNothing().when(connector).deprecatedAlterConfigs(any());

Review Comment:
   We no longer need this part:
   ```suggestion
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -261,6 +328,114 @@ public void testNewTopicConfigs() throws Exception {
         verify(connector).createNewTopics(any(), any());
     }
 
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequested() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
+        doNothing().when(connector).deprecatedAlterConfigs(any());
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." 
+ topic, config);
+        verify(connector).incrementalAlterConfigs(topicConfigs);
+
+        // the next time we sync topic configurations, expect to use the 
deprecated API
+        connector.syncTopicConfigs();
+        verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs);
+    }
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequired() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = new ArrayList<>();
+        ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", 
"value-1");
+        ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", 
"value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false,
+                Collections.emptyList(), ConfigEntry.ConfigType.STRING, "");
+        entries.add(entryWithNonDefaultValue);
+        entries.add(entryWithDefaultValue);
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+
+        doAnswer(invocation -> {
+            Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
invocation.getArgument(0);
+            assertNotNull(configOps);
+            assertEquals(1, configOps.size());
+
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, "source." + topic);
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ops.add(new AlterConfigOp(entryWithNonDefaultValue, 
AlterConfigOp.OpType.SET));
+            ops.add(new AlterConfigOp(entryWithDefaultValue, 
AlterConfigOp.OpType.DELETE));
+
+            assertEquals(configOps.get(configResource), ops);
+
+            return alterConfigsResult(configResource);
+        }).when(admin).incrementalAlterConfigs(any());
+
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." 
+ topic, config);
+        verify(connector).incrementalAlterConfigs(topicConfigs);
+    }
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequiredButUnsupported() throws 
Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        MockAdminClient admin = spy(new MockAdminClient());
+        ConnectorContext connectorContext = mock(ConnectorContext.class);
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new 
DefaultConfigPropertyFilter(), admin));
+        connector.initialize(connectorContext);
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
+        doNothing().when(connector).deprecatedAlterConfigs(any());
+
+        connector.syncTopicConfigs();
+        verify(connectorContext).raiseError(isA(ConnectException.class));
+    }
+
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsNeverUsed() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, 
MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfigs = new MirrorSourceConfig(props);
+
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfigs, new 
DefaultConfigPropertyFilter(), null));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+        doNothing().when(connector).deprecatedAlterConfigs(any());
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." 
+ topic, config);
+        verify(connector).deprecatedAlterConfigs(topicConfigs);

Review Comment:
   Can we also verify that we never tried to use the incremental API?
   ```suggestion
           verify(connector).deprecatedAlterConfigs(topicConfigs);
           verify(connector, never()).incrementalAlterConfigs(any());
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##########
@@ -30,6 +30,9 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
     
     public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = 
"config.properties.exclude";
     public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = 
"config.properties.blacklist";
+    public static final String USE_DEFAULTS_FROM = "use.defaults.from";
+    private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's 
defaults to use when syncing topic configurations.";
+    private static final String USE_DEFAULTS_FROM_DEFAULT = "target";
 
     private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic 
configuration properties and/or regexes "
                                                                 + "that should 
not be replicated.";

Review Comment:
   No worries! I can see how the GitHub UI could be misleading here. Looks good 
now 👍



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -655,6 +657,135 @@ public void testRestartReplication() throws 
InterruptedException {
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
     }
 
+    @Test
+    public void testSyncTopicConfigs() throws InterruptedException {
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // create topic with configuration to test:
+        final Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put("delete.retention.ms", "1000"); // should be excluded 
(default value is 86400000)
+        topicConfig.put("retention.bytes", "1000"); // should be included, 
default value is -1
+
+        final String topic = "test-topic-with-config";
+        final String backupTopic = remoteTopicName(topic, 
PRIMARY_CLUSTER_ALIAS);
+
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+        waitForTopicCreated(backup, backupTopic);
+
+        // alter configs on the target topic
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", 
"2000"), AlterConfigOp.OpType.SET));
+        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        backup.kafka().incrementalAlterConfigs(configOps);
+
+        waitForCondition(() -> {
+            String primaryConfig, backupConfig;
+            primaryConfig = getTopicConfig(primary.kafka(), topic, 
"delete.retention.ms");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, 
"delete.retention.ms");
+            assertNotEquals(primaryConfig, backupConfig,
+                    "`delete.retention.ms` should be different, because it's 
in exclude filter! ");
+            assertEquals("2000", backupConfig, "`delete.retention.ms` should 
be 2000, because it's explicitly defined on the target topic! ");
+
+            // regression test for the config that are still supposed to be 
replicated
+            primaryConfig = getTopicConfig(primary.kafka(), topic, 
"retention.bytes");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, 
"retention.bytes");
+            assertEquals(primaryConfig, backupConfig,
+                    "`retention.bytes` should be the same, because it isn't in 
exclude filter! ");
+            return true;
+        }, 3000, "Topic configurations were not synced");

Review Comment:
   We should use a much higher timeout here (30 or 60 seconds). Our CI machines 
are notoriously slow and this may lead to flakiness as-is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to