gharris1727 commented on code in PR #16080: URL: https://github.com/apache/kafka/pull/16080#discussion_r1622763556
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; - private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; + private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + + "\n" + + "When the lag for a remote partition exceeds the <code>offset.lag.max</code> value, MirrorMaker will initiate a resync operation to catch up the remote partition with the source partition. This involves reading records from the source partition starting from the last committed offset in the remote partition and writing them to the remote partition.\n" + Review Comment: > This involves reading records from the source partition starting from the last committed offset in the remote partition and writing them to the remote partition. This is incorrect, and I don't know where this information came from. When the lag for a remote partition exceeds offset.lag.max, it will emit an offset sync to the offset sync topic, which can then be used by the MirrorCheckpointTask to translate upstream and downstream offsets. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; - private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; + private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + + "\n" + + "When the lag for a remote partition exceeds the <code>offset.lag.max</code> value, MirrorMaker will initiate a resync operation to catch up the remote partition with the source partition. This involves reading records from the source partition starting from the last committed offset in the remote partition and writing them to the remote partition.\n" + + "\n" + + "Setting <code>offset.lag.max</code> to a lower value can be beneficial in scenarios where records may not flow constantly or at a consistent rate, as it ensures the remote partitions stay more closely in sync with the source partitions during periods of low throughput or inactivity. On the other hand, setting it to a higher value can be useful when the source topic has high throughput and the remote partitions can tolerate a larger lag.\n" + Review Comment: > Setting <code>offset.lag.max</code> to a lower value can be beneficial in scenarios where records may not flow constantly or at a consistent rate Lowering the offset.lag.max to a non-zero value doesn't help when the flow is inconsistent. If a pause in the flow happens in-between syncs, the records after the last sync aren't translated. That was addressed in a separate improvement: https://issues.apache.org/jira/browse/KAFKA-15906 Lowering the offset.lag.max to a nonzero value is only necessary when the partition has a consistent low throughput, and a fixed sync every offset.flush.interval.ms isn't acceptable, which is a very contrived scenario. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; - private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; + private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + Review Comment: > The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition. The lag is the difference between the latest offset in the source partition, and the last sync emitted to the offset syncs topic. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; - private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; + private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + + "\n" + Review Comment: Not sure about these double newlines and how they render, how are you previewing these changes? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; - private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; + private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + + "\n" + + "When the lag for a remote partition exceeds the <code>offset.lag.max</code> value, MirrorMaker will initiate a resync operation to catch up the remote partition with the source partition. This involves reading records from the source partition starting from the last committed offset in the remote partition and writing them to the remote partition.\n" + + "\n" + + "Setting <code>offset.lag.max</code> to a lower value can be beneficial in scenarios where records may not flow constantly or at a consistent rate, as it ensures the remote partitions stay more closely in sync with the source partitions during periods of low throughput or inactivity. On the other hand, setting it to a higher value can be useful when the source topic has high throughput and the remote partitions can tolerate a larger lag.\n" + + "\n" + + "It's also possible to set <code>offset.lag.max</code> to 0, which will cause MirrorMaker to initiate a resync operation for a remote partition as soon as it falls behind the source partition. This can be useful for strict synchronization requirements but may increase the load on the source cluster due to frequent resync operations."; Review Comment: Instead of saying "initate a resync operation" we can say "emit an offset sync for every source record", mentioning that the additional load and extra throughput on the offset sync topic specifically. The comment about increased load on the source cluster can be inaccurate, because the syncs topic may be present on the source or target cluster depending on the value of offset-syncs.topic.location configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org