mimaison commented on a change in pull request #11401: URL: https://github.com/apache/kafka/pull/11401#discussion_r744779966
########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ########## @@ -519,6 +521,47 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception { assertFalse(primaryTopics.contains("mm2-offset-syncs." + BACKUP_CLUSTER_ALIAS + ".internal")); } + @Test + public void testTopicConfigPropertyFilteringExclude() throws Exception { + // create exclude filter configuration and start MM2: + mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); + mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "follower\\.replication\\.throttled\\.replicas, " Review comment: To keep the test as simple as needed, what about simply using: ``` mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "delete.retention.ms"); ``` ########## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ########## @@ -519,6 +521,47 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception { assertFalse(primaryTopics.contains("mm2-offset-syncs." + BACKUP_CLUSTER_ALIAS + ".internal")); } + @Test + public void testTopicConfigPropertyFilteringExclude() throws Exception { + // create exclude filter configuration and start MM2: + mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); + mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "follower\\.replication\\.throttled\\.replicas, " + + "leader\\.replication\\.throttled\\.replicas, " + + "message\\.timestamp\\.difference\\.max\\.ms, " + + "message\\.timestamp\\.type, " + + "unclean\\.leader\\.election\\.enable, " + + "min\\.insync\\.replicas," + + "delete\\.retention\\..*"); // this is in addition to the default exclude list + + mm2Config = new MirrorMakerConfig(mm2Props); + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // create topic with configuration to test: + final Map<String, String> topicConfig = new HashMap<>(); + topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000) + topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1 + + + final String topic = "test-topic-with-config"; + primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig); + waitForTopicCreated(backup, PRIMARY_CLUSTER_ALIAS + "." + topic); + + String primaryConfig, backupConfig; + + primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms"); + backupConfig = getTopicConfig(backup.kafka(), PRIMARY_CLUSTER_ALIAS + "." + topic, "delete.retention.ms"); + assertNotEquals(primaryConfig, backupConfig, + "`delete.retention.ms` should be different, because it's in exclude filter! "); + + // regression test for the config that are still supposed to be replicated + primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes"); + backupConfig = getTopicConfig(backup.kafka(), PRIMARY_CLUSTER_ALIAS + "." + topic, "retention.bytes"); + assertEquals(primaryConfig, backupConfig, + "`retention.bytes` should be the same, because it isn't in exclude filter! "); + assertEquals(backupConfig, "1000", + "`retention.bytes` should be the same, because it's explicitly defined! "); Review comment: `should be the same` -> `should be 1000` Also let's swap the arguments as the expected value should come first: ``` assertEquals("1000", backupConfig, ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org