soarez commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1619521840
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -105,6 +108,49 @@ public void stop() { Utils.closeQuietly(targetAdminClient, "target admin client"); } + @Override + public Config validate(Map<String, String> props) { + List<ConfigValue> configValues = super.validate(props).configValues(); + String emitCheckpointsValue = Optional.ofNullable(props.get(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED_DEFAULT)); + String syncGroupOffsetsValue = Optional.ofNullable(props.get(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED)).orElse(Boolean.toString(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED_DEFAULT)); + String emitOffsetSyncsValue = Optional.ofNullable(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).orElse(Boolean.toString(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED_DEFAULT)); + + if ("false".equals(emitCheckpointsValue) && "false".equals(syncGroupOffsetsValue)) { + ConfigValue syncGroupOffsets = configValues.stream().filter(prop -> MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED.equals(prop.name())) + .findAny() + .orElseGet(() -> { + ConfigValue result = new ConfigValue(MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED); + configValues.add(result); + return result; + }); + + ConfigValue emitCheckpoints = configValues.stream().filter(prop -> MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED.equals(prop.name())) + .findAny() + .orElseGet(() -> { + ConfigValue result = new ConfigValue(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED); + configValues.add(result); + return result; + }); + + String errorMessage = "MirrorCheckpointConnector can't run with both" + + MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED + ", " + MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED + "set to false"; + syncGroupOffsets.addErrorMessage(errorMessage); + emitCheckpoints.addErrorMessage(errorMessage); + } + if ("false".equals(emitOffsetSyncsValue) && ("true".equals(emitCheckpointsValue) || "true".equals(syncGroupOffsetsValue))) { + ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(prop.name())) + .findAny() + .orElseGet(() -> { + ConfigValue result = new ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED); + configValues.add(result); + return result; + }); + + emitOffsetSyncs.addErrorMessage("MirrorCheckpointConnector can't run while MirrorSourceConnector configured with" + + MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED + "set to false"); + } + return new Config(configValues); Review Comment: Thanks for the explanation Omnia. That makes sense. Could `MirrorCheckpointConfig` have a new method performs these validations, and we call that method from `MirrorCheckPointconnector#validate`? Maybe the method could return `Optional<ConfigValue>` if there is any validation issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org