mimaison commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1135392026


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -539,11 +596,20 @@ Map<String, Config> describeTopicConfigs(Set<String> 
topics)
     }
 
     Config targetConfig(Config sourceConfig) {
-        List<ConfigEntry> entries = sourceConfig.entries().stream()
-            .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
-            .filter(x -> x.source() != 
ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
-            .filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
-            .collect(Collectors.toList());
+        List<ConfigEntry> entries;
+        if (useIncrementalAlterConfigs == 
MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) {

Review Comment:
   We should use `equals()` to compare String objects



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -117,6 +122,16 @@ public MirrorSourceConnector() {
         this.configPropertyFilter = configPropertyFilter;
     }
 
+    // visible for testing the deprecated setting 
"use.incremental.alter.configs"
+    // this constructor should be removed when the deprecated setting is 
removed in Kafka 4.0
+    MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy 
replicationPolicy,
+                          String useIncrementalAlterConfigs, 
ConfigPropertyFilter configPropertyFilter, Admin targetAdmin) {
+        this.sourceAndTarget = sourceAndTarget;
+        this.replicationPolicy = replicationPolicy;
+        this.configPropertyFilter = configPropertyFilter;
+        this.useIncrementalAlterConfigs = useIncrementalAlterConfigs;
+        this.targetAdminClient = targetAdmin;
+        

Review Comment:
   We're missing a closing bracket here!



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##########
@@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
     
     public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = 
"config.properties.exclude";
     public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = 
"config.properties.blacklist";
+    public static final String USE_DEFAULTS_FROM = "use.defaults.from";;

Review Comment:
   We can remove the trailing semi colon



##########
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java:
##########
@@ -78,6 +80,19 @@ public static ListTopicsResult listTopicsResult(String 
topic) {
         return new ListTopicsResult(future);
     }
 
+    /**
+     * Helper to create a AlterConfigsResult instance for a given Throwable.
+     * AlterConfigsResult's constructor is only accessible from within the
+     * admin package.
+     */
+    public static AlterConfigsResult alterConfigsResult(ConfigResource cr, 
Throwable t) {
+        KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+        Map<ConfigResource, KafkaFuture<Void>> futures = new HashMap<>();
+        futures.put(cr, future);

Review Comment:
   We can use singletonMap here:
   ```suggestion
   Map<ConfigResource, KafkaFuture<Void>> futures = 
Collections.singletonMap(cr, future);
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##########
@@ -30,6 +31,9 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
     
     public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = 
"config.properties.exclude";
     public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = 
"config.properties.blacklist";
+    public static final String USE_DEFAULTS_FROM = "use.defaults.from";;
+    private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's 
default to use when syncing topic configurations.";

Review Comment:
   Should it be `defaults` instead of `default`?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -64,6 +67,7 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;

Review Comment:
   This is already imported just above



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java:
##########
@@ -40,11 +44,13 @@ public class DefaultConfigPropertyFilter implements 
ConfigPropertyFilter {
                                                                    + 
"unclean\\.leader\\.election\\.enable, "
                                                                    + 
"min\\.insync\\.replicas";
     private Pattern excludePattern = 
MirrorUtils.compilePatternList(CONFIG_PROPERTIES_EXCLUDE_DEFAULT);
+    private String useDefaultsFrom = USE_DEFAULTS_FROM_DEFAULT;

Review Comment:
   Do we need this field? I think we could remove it and do
   ```
   "source".equals(config.useDefaultsFrom())
   ```
   in `shouldReplicateSourceDefault()`



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -73,6 +73,18 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
     public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = 
SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX;
     private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = 
"Frequency of topic config sync.";
     public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 
* 60;
+    @Deprecated
+    public static final String USE_INCREMENTAL_ALTER_CONFIG = 
"use.incremental.alter.configs";
+    private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = 
"Deprecated. Which API to use for syncing topic configs. " +
+            "The valid values are \"requested\", \"required\" and \"never\". " 
+
+            "By default, set to \"requested\", which means the 
IncrementalAlterConfigs is being used for syncing topic configurations " +
+            "and if any request receives an error from an incompatible broker, 
it will fallback to using the deprecated AlterConfigs API. " +
+            "If explicitly set to \"required\", the IncrementalAlterConfigs is 
used without the fallback logic. " +
+            "If explicitly set to \"never\", the AlterConfig is always used." +
+            "This setting will be removed in Kafka 4.0, therefore users should 
ensure that target broker is at least 2.3.0";

Review Comment:
   I think we should mention that the behavior of `required` will be used from 
Kafka 4.0



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -73,6 +73,18 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
     public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = 
SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX;
     private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = 
"Frequency of topic config sync.";
     public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 
* 60;
+    @Deprecated
+    public static final String USE_INCREMENTAL_ALTER_CONFIG = 
"use.incremental.alter.configs";
+    private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = 
"Deprecated. Which API to use for syncing topic configs. " +
+            "The valid values are \"requested\", \"required\" and \"never\". " 
+
+            "By default, set to \"requested\", which means the 
IncrementalAlterConfigs is being used for syncing topic configurations " +

Review Comment:
   `IncrementalAlterConfigs` -> `IncrementalAlterConfigs API`



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -73,6 +73,18 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
     public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = 
SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX;
     private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = 
"Frequency of topic config sync.";
     public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 
* 60;
+    @Deprecated
+    public static final String USE_INCREMENTAL_ALTER_CONFIG = 
"use.incremental.alter.configs";
+    private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = 
"Deprecated. Which API to use for syncing topic configs. " +
+            "The valid values are \"requested\", \"required\" and \"never\". " 
+
+            "By default, set to \"requested\", which means the 
IncrementalAlterConfigs is being used for syncing topic configurations " +
+            "and if any request receives an error from an incompatible broker, 
it will fallback to using the deprecated AlterConfigs API. " +
+            "If explicitly set to \"required\", the IncrementalAlterConfigs is 
used without the fallback logic. " +
+            "If explicitly set to \"never\", the AlterConfig is always used." +
+            "This setting will be removed in Kafka 4.0, therefore users should 
ensure that target broker is at least 2.3.0";
+    public static final String USE_INCREMENTAL_ALTER_CONFIG_DEFAULT = 
"requested";

Review Comment:
   Should we rename this field to `REQUESTED_INCREMENTAL_ALTER_CONFIG`?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -209,11 +227,34 @@ public void testConfigPropertyFiltering() {
             new DefaultReplicationPolicy(), x -> true, new 
DefaultConfigPropertyFilter());
         ArrayList<ConfigEntry> entries = new ArrayList<>();
         entries.add(new ConfigEntry("name-1", "value-1"));
+        entries.add(new ConfigEntry("name-2", "value-2", 
ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), 
ConfigEntry.ConfigType.STRING, ""));
+        entries.add(new ConfigEntry("min.insync.replicas", "2"));
+        Config config = new Config(entries);
+        Config targetConfig = connector.targetConfig(config);
+        assertTrue(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("name-1")), "should replicate 
properties");
+        assertTrue(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("name-2")), "should include default 
properties");
+        assertFalse(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("min.insync.replicas")), "should 
not replicate excluded properties");
+    }
+
+    @Test
+    @Deprecated
+    public void testConfigPropertyFilteringWithAlterConfigs() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), 
MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG, new 
DefaultConfigPropertyFilter(), null);
+        ArrayList<ConfigEntry> entries = new ArrayList<>();

Review Comment:
   Left side can be `List<ConfigEntry>`



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -261,6 +302,54 @@ public void testNewTopicConfigs() throws Exception {
         verify(connector).createNewTopics(any(), any());
     }
 
+    @Test
+    public void testIncrementalAlterConfigsRequested() throws Exception {
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), 
MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = new ArrayList<>();
+        entries.add(new ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
+        doNothing().when(connector).alterConfigs(any());
+        connector.syncTopicConfigs();
+        verify(connector).syncTopicConfigs();
+        verify(connector).incrementalAlterConfigs(any());
+        verify(connector, times(1)).alterConfigs(any());
+    }
+
+    @Test
+    public void testIncrementalAlterConfigsRequired() throws Exception {

Review Comment:
   Can we add a test that causes `incrementalAlterConfig()` to emit a 
`OpType.DELETE` op?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -261,6 +302,54 @@ public void testNewTopicConfigs() throws Exception {
         verify(connector).createNewTopics(any(), any());
     }
 
+    @Test
+    public void testIncrementalAlterConfigsRequested() throws Exception {
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), 
MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = new ArrayList<>();
+        entries.add(new ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
+        doNothing().when(connector).alterConfigs(any());

Review Comment:
   Can we verify the arguments passed to `incrementalAlterConfigs()` and 
`alterConfigs()` in these tests?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##########
@@ -140,6 +147,143 @@ public void testReplication() throws Exception {
                 "New topic was not replicated to backup cluster.");
     }
 
+    @Test
+    public void testSyncTopicConfigs() throws Exception {
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+        waitUntilMirrorMakerIsRunning(primary, 
Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, 
BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
+
+        // create topic with configuration to test:
+        final Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put("delete.retention.ms", "1000"); // should be excluded 
(default value is 86400000)
+        topicConfig.put("retention.bytes", "1000"); // should be included, 
default value is -1
+
+        final String topic = "test-topic-with-config";
+        final String backupTopic = remoteTopicName(topic, 
PRIMARY_CLUSTER_ALIAS);
+
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+        waitForTopicCreated(backup, backupTopic);
+
+        // alter configs on the target topic
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", 
"2000"), AlterConfigOp.OpType.SET));
+        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new 
HashMap<>(1);
+        configOps.put(configResource, ops);
+        // alter configs on target cluster
+        backup.kafka().incrementalAlterConfigs(configOps);
+
+        // sleep few seconds to have MM2 sync topic configuration
+        Thread.sleep(TimeUnit.SECONDS.toMillis(3));

Review Comment:
   Can we avoid using `sleep()` directly and instead use `waitForCondition()`?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java:
##########
@@ -27,6 +28,8 @@ public interface ConfigPropertyFilter extends Configurable, 
AutoCloseable {
 
     boolean shouldReplicateConfigProperty(String prop);
 
+    boolean shouldReplicateSourceDefault(ConfigEntry.ConfigSource source);

Review Comment:
   We need a default method here otherwise all existing implementations of this 
interface will break.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -261,6 +302,54 @@ public void testNewTopicConfigs() throws Exception {
         verify(connector).createNewTopics(any(), any());
     }
 
+    @Test
+    public void testIncrementalAlterConfigsRequested() throws Exception {
+        MockAdminClient admin = spy(new MockAdminClient());
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), 
MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT, new 
DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = new ArrayList<>();
+        entries.add(new ConfigEntry("name-1", "value-1"));

Review Comment:
   Can be simplified with:
   ```suggestion
           List<ConfigEntry> entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -162,6 +174,15 @@ Duration syncTopicConfigsInterval() {
         }
     }
 
+    String useIncrementalAlterConfigs() {
+        String prop = getString(USE_INCREMENTAL_ALTER_CONFIG);
+        if (prop.equals(NEVER_USE_INCREMENTAL_ALTER_CONFIG) || 
prop.equals(REQUIRE_INCREMENTAL_ALTER_CONFIG)) {

Review Comment:
   Do we need this if/else block? The config is defined with a default value so 
it should already return `USE_INCREMENTAL_ALTER_CONFIG_DEFAULT` is not set 
explicitly.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##########
@@ -140,6 +147,143 @@ public void testReplication() throws Exception {
                 "New topic was not replicated to backup cluster.");
     }
 
+    @Test
+    public void testSyncTopicConfigs() throws Exception {
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+        waitUntilMirrorMakerIsRunning(primary, 
Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, 
BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);

Review Comment:
   Do we need to start 2 instances of MirrorMaker for this test? A single 
instance would simplify this test a bit.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##########
@@ -140,6 +147,143 @@ public void testReplication() throws Exception {
                 "New topic was not replicated to backup cluster.");
     }
 
+    @Test
+    public void testSyncTopicConfigs() throws Exception {
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+        waitUntilMirrorMakerIsRunning(primary, 
Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, 
BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
+
+        // create topic with configuration to test:
+        final Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put("delete.retention.ms", "1000"); // should be excluded 
(default value is 86400000)
+        topicConfig.put("retention.bytes", "1000"); // should be included, 
default value is -1
+
+        final String topic = "test-topic-with-config";
+        final String backupTopic = remoteTopicName(topic, 
PRIMARY_CLUSTER_ALIAS);
+
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+        waitForTopicCreated(backup, backupTopic);
+
+        // alter configs on the target topic
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", 
"2000"), AlterConfigOp.OpType.SET));
+        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), 
AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new 
HashMap<>(1);
+        configOps.put(configResource, ops);

Review Comment:
   We can use:
   ```suggestion
           Map<ConfigResource, Collection<AlterConfigOp>> configOps = 
Collections.singletonMap(configResource, ops);
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -539,11 +596,20 @@ Map<String, Config> describeTopicConfigs(Set<String> 
topics)
     }
 
     Config targetConfig(Config sourceConfig) {
-        List<ConfigEntry> entries = sourceConfig.entries().stream()
-            .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
-            .filter(x -> x.source() != 
ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
-            .filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
-            .collect(Collectors.toList());
+        List<ConfigEntry> entries;
+        if (useIncrementalAlterConfigs == 
MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) {

Review Comment:
   Both branches are very similar, can we use merge them and only conditionally 
do the `isDefault()` check?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -264,6 +285,13 @@ Duration consumerPollTimeout() {
                     SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT,
                     ConfigDef.Importance.LOW,
                     SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC)
+            .define(
+                    USE_INCREMENTAL_ALTER_CONFIG,
+                    ConfigDef.Type.STRING,
+                    USE_INCREMENTAL_ALTER_CONFIG_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    USE_INCREMENTAL_ALTER_CONFIG_DOC
+            )

Review Comment:
   nit: All other configs put the closing parenthesis on the last line. Let's 
use the same formatting.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java:
##########
@@ -27,6 +28,8 @@ public interface ConfigPropertyFilter extends Configurable, 
AutoCloseable {
 
     boolean shouldReplicateConfigProperty(String prop);
 
+    boolean shouldReplicateSourceDefault(ConfigEntry.ConfigSource source);

Review Comment:
   In the KIP the argument was the config name as a String, why is it a 
ConfigEntry.ConfigSource object here?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java:
##########
@@ -27,6 +28,8 @@ public interface ConfigPropertyFilter extends Configurable, 
AutoCloseable {
 
     boolean shouldReplicateConfigProperty(String prop);
 
+    boolean shouldReplicateSourceDefault(ConfigEntry.ConfigSource source);

Review Comment:
   Can we also add some javadoc?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -72,8 +76,22 @@
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import org.mockito.Mockito;

Review Comment:
   Unused import



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##########
@@ -140,6 +147,143 @@ public void testReplication() throws Exception {
                 "New topic was not replicated to backup cluster.");
     }
 
+    @Test

Review Comment:
   Should this test be in `MirrorConnectorsIntegrationBaseTest` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to