OmniaGM commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1611972192


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -105,6 +108,49 @@ public void stop() {
         Utils.closeQuietly(targetAdminClient, "target admin client");
     }
 
+    @Override
+    public Config validate(Map<String, String> props) {
+        List<ConfigValue> configValues = super.validate(props).configValues();
+        String emitCheckpointsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED_DEFAULT));
+        String syncGroupOffsetsValue = 
Optional.ofNullable(props.get(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED_DEFAULT));
+        String emitOffsetSyncsValue = 
Optional.ofNullable(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).orElse(Boolean.toString(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED_DEFAULT));
+
+        if ("false".equals(emitCheckpointsValue) && 
"false".equals(syncGroupOffsetsValue)) {
+            ConfigValue syncGroupOffsets = configValues.stream().filter(prop 
-> MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED.equals(prop.name()))
+                    .findAny()
+                    .orElseGet(() -> {
+                        ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED);
+                        configValues.add(result);
+                        return result;
+                    });
+
+            ConfigValue emitCheckpoints = configValues.stream().filter(prop -> 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED.equals(prop.name()))
+                    .findAny()
+                    .orElseGet(() -> {
+                        ConfigValue result = new 
ConfigValue(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED);
+                        configValues.add(result);
+                        return result;
+                    });
+
+            String errorMessage = "MirrorCheckpointConnector can't run with 
both" +
+                    MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED + ", " + 
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED + "set to false";
+            syncGroupOffsets.addErrorMessage(errorMessage);
+            emitCheckpoints.addErrorMessage(errorMessage);
+        }
+        if ("false".equals(emitOffsetSyncsValue) && 
("true".equals(emitCheckpointsValue) || "true".equals(syncGroupOffsetsValue))) {
+            ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> 
MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(prop.name()))
+                    .findAny()
+                    .orElseGet(() -> {
+                        ConfigValue result = new 
ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED);
+                        configValues.add(result);
+                        return result;
+                    });
+
+            emitOffsetSyncs.addErrorMessage("MirrorCheckpointConnector can't 
run while MirrorSourceConnector configured with" +
+                    MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED + "set to 
false");
+        }
+        return new Config(configValues);

Review Comment:
   > Can these validations live in the config class? In this case that would be 
MirrorConnectConfig.
   
   `MirrorConnectConfig` is more generic and shared between multiple 
connectors. We shouldn't move this validation there maybe 
`MirrorCheckpointConfig` but not `MirrorConnectConfig`
   
   > Besides being in line with config validation it has the added benefit that 
we can use .getBoolean(...) instead of  
props.get(..prop.)).orElse(Boolean.toString(...default...).
   
   I think we should to leverage `Connector::validate` here as it get called 
before `start` and we can fail faster. If we want to use `getBooleans` maybe we 
should initialise `MirrorCheckpointConfig` within the validate but we shouldn't 
move this from `Connector::validate`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to