bdesert commented on a change in pull request #11401:
URL: https://github.com/apache/kafka/pull/11401#discussion_r744959364



##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##########
@@ -519,6 +521,47 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
         assertFalse(primaryTopics.contains("mm2-offset-syncs." + 
BACKUP_CLUSTER_ALIAS + ".internal"));
     }
 
+    @Test
+    public void testTopicConfigPropertyFilteringExclude() throws Exception {
+        // create exclude filter configuration and start MM2:
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
+        
mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, 
"follower\\.replication\\.throttled\\.replicas, "
+                + "leader\\.replication\\.throttled\\.replicas, "
+                + "message\\.timestamp\\.difference\\.max\\.ms, "
+                + "message\\.timestamp\\.type, "
+                + "unclean\\.leader\\.election\\.enable, "
+                + "min\\.insync\\.replicas,"
+                + "delete\\.retention\\..*"); // this is in addition to the 
default exclude list
+
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // create topic with configuration to test:
+        final Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put("delete.retention.ms", "1000"); // should be excluded 
(default value is 86400000)
+        topicConfig.put("retention.bytes", "1000"); // should be included, 
default value is -1
+
+
+        final String topic = "test-topic-with-config";
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+        waitForTopicCreated(backup, PRIMARY_CLUSTER_ALIAS + "." + topic);
+
+        String primaryConfig, backupConfig;
+
+        primaryConfig = getTopicConfig(primary.kafka(), topic, 
"delete.retention.ms");
+        backupConfig = getTopicConfig(backup.kafka(), PRIMARY_CLUSTER_ALIAS + 
"." + topic, "delete.retention.ms");
+        assertNotEquals(primaryConfig, backupConfig,
+                "`delete.retention.ms` should be different, because it's in 
exclude filter! ");
+
+        // regression test for the config that are still supposed to be 
replicated
+        primaryConfig = getTopicConfig(primary.kafka(), topic, 
"retention.bytes");
+        backupConfig = getTopicConfig(backup.kafka(), PRIMARY_CLUSTER_ALIAS + 
"." + topic, "retention.bytes");
+        assertEquals(primaryConfig, backupConfig,
+                "`retention.bytes` should be the same, because it isn't in 
exclude filter! ");
+        assertEquals(backupConfig, "1000",
+                "`retention.bytes` should be the same, because it's explicitly 
defined! ");

Review comment:
       updated per suggestions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to