chia7712 commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1650089242


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +170,25 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public boolean validate() {
+        Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+        Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+        if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+            LOG.warn("MirrorCheckpointConnector can't run without both " + 
SYNC_GROUP_OFFSETS_ENABLED + ", " +
+                    EMIT_CHECKPOINTS_ENABLED + " set to false");
+            return false;
+        }
+
+        boolean requireOffsetSyncs = emitCheckpointsValue || 
syncGroupOffsetsValue;

Review Comment:
   As `!emitCheckpointsValue && !syncGroupOffsetsValue` return before, 
`requireOffsetSyncs` is always true, right? 



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +170,25 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public boolean validate() {
+        Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+        Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+        if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+            LOG.warn("MirrorCheckpointConnector can't run without both " + 
SYNC_GROUP_OFFSETS_ENABLED + ", " +
+                    EMIT_CHECKPOINTS_ENABLED + " set to false");
+            return false;
+        }
+
+        boolean requireOffsetSyncs = emitCheckpointsValue || 
syncGroupOffsetsValue;
+        if 
(!"true".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true"))
 & requireOffsetSyncs) {

Review Comment:
   Maybe we should use `EMIT_OFFSET_SYNCS_ENABLED_DEFAULT` here? 



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -366,6 +291,19 @@ boolean update(long upstreamOffset, long downstreamOffset) 
{
             return shouldSyncOffsets;
         }
 
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof PartitionState)) return false;
+            PartitionState that = (PartitionState) o;
+            return previousUpstreamOffset == that.previousUpstreamOffset && 
previousDownstreamOffset == that.previousDownstreamOffset && 
lastSyncDownstreamOffset == that.lastSyncDownstreamOffset && maxOffsetLag == 
that.maxOffsetLag && shouldSyncOffsets == that.shouldSyncOffsets;

Review Comment:
   Could you please split it to multi lines?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +170,25 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public boolean validate() {
+        Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+        Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+        if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+            LOG.warn("MirrorCheckpointConnector can't run without both " + 
SYNC_GROUP_OFFSETS_ENABLED + ", " +
+                    EMIT_CHECKPOINTS_ENABLED + " set to false");
+            return false;
+        }
+
+        boolean requireOffsetSyncs = emitCheckpointsValue || 
syncGroupOffsetsValue;
+        if 
(!"true".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true"))
 & requireOffsetSyncs) {
+            LOG.warn("MirrorCheckpointConnector can't run with " + 
EMIT_OFFSET_SYNCS_ENABLED + " set to false while, " +
+                    EMIT_CHECKPOINTS_ENABLED  + " and/o r" + 
SYNC_GROUP_OFFSETS_ENABLED + " set to true");

Review Comment:
   ` and/o r` or ` and/or`?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -234,6 +234,30 @@ public ConfigDef config() {
     @Override
     public org.apache.kafka.common.config.Config validate(Map<String, String> 
props) {
         List<ConfigValue> configValues = super.validate(props).configValues();
+        validateExactlyOnceConfigs(props, configValues);
+        validateEmitOffsetSyncConfigs(props, configValues);
+
+        return new org.apache.kafka.common.config.Config(configValues);
+    }
+
+    private static void validateEmitOffsetSyncConfigs(Map<String, String> 
props, List<ConfigValue> configValues) {
+        boolean offsetSyncsConfigured = configValues.stream()
+                .anyMatch(conf -> 
conf.name().startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || 
conf.name().startsWith(OFFSET_SYNCS_TOPIC_CONFIG_PREFIX));
+
+        if ("false".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG)) && 
offsetSyncsConfigured) {

Review Comment:
   Should it be `EMIT_OFFSET_SYNCS_ENABLED` rather than 
`EXACTLY_ONCE_SUPPORT_CONFIG` ?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java:
##########
@@ -110,9 +110,19 @@ public abstract class MirrorConnectorConfig extends 
AbstractConfig {
     public static final String TOPIC_FILTER_CLASS_DOC = "TopicFilter to use. 
Selects topics to replicate.";
     public static final Class<?> TOPIC_FILTER_CLASS_DEFAULT = 
DefaultTopicFilter.class;
 
-    public static final String OFFSET_SYNCS_TOPIC_LOCATION = 
"offset-syncs.topic.location";
+    public static final String OFFSET_SYNCS_TOPIC_CONFIG_PREFIX = 
"offset-syncs.topic.";
+    public static final String OFFSET_SYNCS_TOPIC_LOCATION = 
OFFSET_SYNCS_TOPIC_CONFIG_PREFIX + "location";
     public static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = 
SOURCE_CLUSTER_ALIAS_DEFAULT;
     public static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location 
(source/target) of the offset-syncs topic.";
+
+    public static final String EMIT_OFFSET_SYNCS_ENABLED = "emit.offset-syncs" 
+ ENABLED_SUFFIX;
+    public static final String EMIT_OFFSET_SYNCS_ENABLED_DOC = "Whether to 
store the new offset of the replicated records in offset-syncs topic or not. " +
+            "MirrorCheckpointConnector will fail to start if 
emit.checkpoints.enabled and/or sync.group.offsets.enabled are enabled while " +

Review Comment:
   I guess this docs need to be revised, since it will not fail, right?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +170,25 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public boolean validate() {
+        Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+        Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+        if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+            LOG.warn("MirrorCheckpointConnector can't run without both " + 
SYNC_GROUP_OFFSETS_ENABLED + ", " +
+                    EMIT_CHECKPOINTS_ENABLED + " set to false");
+            return false;
+        }
+
+        boolean requireOffsetSyncs = emitCheckpointsValue || 
syncGroupOffsetsValue;

Review Comment:
   With above comment, I feel we don't need local variables 
`emitCheckpointsValue` and `syncGroupOffsetsValue`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to