gharris1727 commented on code in PR #16080:
URL: https://github.com/apache/kafka/pull/16080#discussion_r1622763556


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
     public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
     public static final String OFFSET_LAG_MAX = "offset.lag.max";
-    private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+    private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +
+            "\n" +
+            "When the lag for a remote partition exceeds the 
<code>offset.lag.max</code> value, MirrorMaker will initiate a resync operation 
to catch up the remote partition with the source partition. This involves 
reading records from the source partition starting from the last committed 
offset in the remote partition and writing them to the remote partition.\n" +

Review Comment:
   > This involves reading records from the source partition starting from the 
last committed offset in the remote partition and writing them to the remote 
partition.
   
   This is incorrect, and I don't know where this information came from.
   
   When the lag for a remote partition exceeds offset.lag.max, it will emit an 
offset sync to the offset sync topic, which can then be used by the 
MirrorCheckpointTask to translate upstream and downstream offsets.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
     public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
     public static final String OFFSET_LAG_MAX = "offset.lag.max";
-    private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+    private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +
+            "\n" +
+            "When the lag for a remote partition exceeds the 
<code>offset.lag.max</code> value, MirrorMaker will initiate a resync operation 
to catch up the remote partition with the source partition. This involves 
reading records from the source partition starting from the last committed 
offset in the remote partition and writing them to the remote partition.\n" +
+            "\n" +
+            "Setting <code>offset.lag.max</code> to a lower value can be 
beneficial in scenarios where records may not flow constantly or at a 
consistent rate, as it ensures the remote partitions stay more closely in sync 
with the source partitions during periods of low throughput or inactivity. On 
the other hand, setting it to a higher value can be useful when the source 
topic has high throughput and the remote partitions can tolerate a larger 
lag.\n" +

Review Comment:
   > Setting <code>offset.lag.max</code> to a lower value can be beneficial in 
scenarios where records may not flow constantly or at a consistent rate
   
   Lowering the offset.lag.max to a non-zero value doesn't help when the flow 
is inconsistent. If a pause in the flow happens in-between syncs, the records 
after the last sync aren't translated. That was addressed in a separate 
improvement: https://issues.apache.org/jira/browse/KAFKA-15906
   
   Lowering the offset.lag.max to a nonzero value is only necessary when the 
partition has a consistent low throughput, and a fixed sync every 
offset.flush.interval.ms isn't acceptable, which is a very contrived scenario.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
     public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
     public static final String OFFSET_LAG_MAX = "offset.lag.max";
-    private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+    private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +

Review Comment:
   > The lag is calculated as the difference between the latest offset in the 
source partition and the last committed offset in the remote partition.
   
   The lag is the difference between the latest offset in the source partition, 
and the last sync emitted to the offset syncs topic.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
     public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
     public static final String OFFSET_LAG_MAX = "offset.lag.max";
-    private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+    private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +
+            "\n" +

Review Comment:
   Not sure about these double newlines and how they render, how are you 
previewing these changes?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
     public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
     public static final String OFFSET_LAG_MAX = "offset.lag.max";
-    private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+    private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +
+            "\n" +
+            "When the lag for a remote partition exceeds the 
<code>offset.lag.max</code> value, MirrorMaker will initiate a resync operation 
to catch up the remote partition with the source partition. This involves 
reading records from the source partition starting from the last committed 
offset in the remote partition and writing them to the remote partition.\n" +
+            "\n" +
+            "Setting <code>offset.lag.max</code> to a lower value can be 
beneficial in scenarios where records may not flow constantly or at a 
consistent rate, as it ensures the remote partitions stay more closely in sync 
with the source partitions during periods of low throughput or inactivity. On 
the other hand, setting it to a higher value can be useful when the source 
topic has high throughput and the remote partitions can tolerate a larger 
lag.\n" +
+            "\n" +
+            "It's also possible to set <code>offset.lag.max</code> to 0, which 
will cause MirrorMaker to initiate a resync operation for a remote partition as 
soon as it falls behind the source partition. This can be useful for strict 
synchronization requirements but may increase the load on the source cluster 
due to frequent resync operations.";

Review Comment:
   Instead of saying "initate a resync operation" we can say "emit an offset 
sync for every source record", mentioning that the additional load and extra 
throughput on the offset sync topic specifically.
   
   The comment about increased load on the source cluster can be inaccurate, 
because the syncs topic may be present on the source or target cluster 
depending on the value of offset-syncs.topic.location configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to