mimaison commented on a change in pull request #11401:
URL: https://github.com/apache/kafka/pull/11401#discussion_r742002383



##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##########
@@ -519,6 +521,47 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
         assertFalse(primaryTopics.contains("mm2-offset-syncs." + 
BACKUP_CLUSTER_ALIAS + ".internal"));
     }
 
+    @Test

Review comment:
       Instead of an integration test, what about adding a test like this to 
`MirrorSourceConnectorTest`:
   
       @Test
       public void testNewTopicConfigs() throws Exception {
           Map<String, Object> mmConfig = new HashMap<>();
           
mmConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, 
"follower\\.replication\\.throttled\\.replicas, "
                   + "leader\\.replication\\.throttled\\.replicas, "
                   + "message\\.timestamp\\.difference\\.max\\.ms, "
                   + "message\\.timestamp\\.type, "
                   + "unclean\\.leader\\.election\\.enable, "
                   + "min\\.insync\\.replicas,"
                   + "exclude_param.*");
           DefaultConfigPropertyFilter filter = new 
DefaultConfigPropertyFilter();
           filter.configure(mmConfig);
   
           MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
                   new DefaultReplicationPolicy(), x -> true, filter));
   
           String topic = "testtopic";
           List<ConfigEntry> entries = new ArrayList<>();
           entries.add(new ConfigEntry("name-1", "value-1"));
           entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
           entries.add(new ConfigEntry("min.insync.replicas", "2"));
           Config config = new Config(entries);
           doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
           doAnswer(invocation -> {
               Map<String, NewTopic> newTopics = invocation.getArgument(0);
               assertNotNull(newTopics.get("source." + topic));
               Map<String, String> targetConfig = newTopics.get("source." + 
topic).configs();
   
               // property 'name-1' isn't defined in the exclude filter -> 
should be replicated
               assertNotNull(targetConfig.get("name-1"), "should replicate 
properties");
   
               // this property is in default list, just double check it:
               String prop1 = "min.insync.replicas";
               assertNull(targetConfig.get(prop1), "should not replicate 
excluded properties " + prop1);
               // this property is only in exclude filter custom parameter, 
also tests regex on the way:
               String prop2 = "exclude_param.param1";
               assertNull(targetConfig.get(prop2), "should not replicate 
excluded properties " + prop2);
               return null;
           }).when(connector).createNewTopics(any());
           connector.createNewTopics(Collections.singleton(topic), 
Collections.singletonMap(topic, 1L));
           verify(connector).targetConfig(config);
       }
   
   What do you think?

##########
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##########
@@ -519,6 +521,47 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
         assertFalse(primaryTopics.contains("mm2-offset-syncs." + 
BACKUP_CLUSTER_ALIAS + ".internal"));
     }
 
+    @Test

Review comment:
       Instead of an integration test, what about adding a test like this to 
`MirrorSourceConnectorTest`:
   
       @Test
       public void testNewTopicConfigs() throws Exception {
           Map<String, Object> mmConfig = new HashMap<>();
           
mmConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, 
"follower\\.replication\\.throttled\\.replicas, "
                   + "leader\\.replication\\.throttled\\.replicas, "
                   + "message\\.timestamp\\.difference\\.max\\.ms, "
                   + "message\\.timestamp\\.type, "
                   + "unclean\\.leader\\.election\\.enable, "
                   + "min\\.insync\\.replicas,"
                   + "exclude_param.*");
           DefaultConfigPropertyFilter filter = new 
DefaultConfigPropertyFilter();
           filter.configure(mmConfig);
   
           MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
                   new DefaultReplicationPolicy(), x -> true, filter));
   
           String topic = "testtopic";
           List<ConfigEntry> entries = new ArrayList<>();
           entries.add(new ConfigEntry("name-1", "value-1"));
           entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
           entries.add(new ConfigEntry("min.insync.replicas", "2"));
           Config config = new Config(entries);
           doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
           doAnswer(invocation -> {
               Map<String, NewTopic> newTopics = invocation.getArgument(0);
               assertNotNull(newTopics.get("source." + topic));
               Map<String, String> targetConfig = newTopics.get("source." + 
topic).configs();
   
               // property 'name-1' isn't defined in the exclude filter -> 
should be replicated
               assertNotNull(targetConfig.get("name-1"), "should replicate 
properties");
   
               // this property is in default list, just double check it:
               String prop1 = "min.insync.replicas";
               assertNull(targetConfig.get(prop1), "should not replicate 
excluded properties " + prop1);
               // this property is only in exclude filter custom parameter, 
also tests regex on the way:
               String prop2 = "exclude_param.param1";
               assertNull(targetConfig.get(prop2), "should not replicate 
excluded properties " + prop2);
               return null;
           }).when(connector).createNewTopics(any());
           connector.createNewTopics(Collections.singleton(topic), 
Collections.singletonMap(topic, 1L));
           verify(connector).targetConfig(config);
       }
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to