C0urante commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1143623680


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -514,6 +540,37 @@ private void updateTopicConfigs(Map<String, Config> 
topicConfigs) {
         }));
     }
 
+    // visible for testing
+    void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new 
HashMap<>();
+        for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+            for (ConfigEntry config : topicConfig.getValue().entries()) {
+                if (config.isDefault() && 
!shouldReplicateSourceDefault(config.source())) {
+                    ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.DELETE));
+                } else {
+                    ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.SET));
+                }
+            }
+            configOps.put(configResource, ops);
+        }
+        log.trace("Syncing configs for {} topics.", configOps.size());
+        
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
+            if (e != null) {
+                if 
(useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT)

Review Comment:
   We can mock the connector's context and ensure that the correct method has 
been invoked:
   ```java
       @Test
       public void testIncrementalAlterConfigsRequiredButUnsupported() throws 
Exception {
           MockAdminClient admin = spy(new MockAdminClient());
           ConnectorContext connectorContext = mock(ConnectorContext.class);
           MirrorSourceConnector connector = spy(new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
                   new DefaultReplicationPolicy(), 
MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIG, new 
DefaultConfigPropertyFilter(), admin));
           connector.initialize(connectorContext);
           final String topic = "testtopic";
           List<ConfigEntry> entries = Collections.singletonList(new 
ConfigEntry("name-1", "value-1"));
           Config config = new Config(entries);
           doReturn(Collections.singletonMap(topic, 
config)).when(connector).describeTopicConfigs(any());
           doReturn(alterConfigsResult(new 
ConfigResource(ConfigResource.Type.TOPIC, topic), new 
UnsupportedVersionException("Unsupported 
API"))).when(admin).incrementalAlterConfigs(any());
           doNothing().when(connector).deprecatedAlterConfigs(any());
   
           connector.syncTopicConfigs();
           
verify(connectorContext).raiseError(isA(UnsupportedVersionException.class));
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to