[GitHub] [kafka] hudeqi commented on a diff in pull request #14077: KAFKA-14112: Expose replication-offset-lag Mirror metric

2023-09-04 Thread via GitHub


hudeqi commented on code in PR #14077:
URL: https://github.com/apache/kafka/pull/14077#discussion_r1314592488


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java:
##
@@ -104,12 +108,25 @@ class MirrorSourceMetrics implements AutoCloseable {
 replicationLatencyAvg = new MetricNameTemplate(
 "replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP,
 "Average time it takes records to replicate from source to 
target cluster.", partitionTags);
+replicationOffsetLag = new MetricNameTemplate(

Review Comment:
   I guess we have a disagreement about lag? My understanding of lag is: the 
real LEO of the source cluster partition minus the LEO that has been written to 
the target cluster. It seems that your definition of lag is: the lag between 
the mirror task getting data from consumption and writing it to the target 
cluster?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #14077: KAFKA-14112: Expose replication-offset-lag Mirror metric

2023-09-04 Thread via GitHub


hudeqi commented on code in PR #14077:
URL: https://github.com/apache/kafka/pull/14077#discussion_r1314583243


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -282,6 +285,38 @@ SourceRecord convertRecord(ConsumerRecord 
record) {
 record.timestamp(), headers);
 }
 
+//visible for testing
+void reportReplicationOffsetLag(ConsumerRecords 
lastPolledRecords) {
+Set partitions = lastPolledRecords.partitions();
+partitions.forEach(p -> {
+try {
+long replicationOffsetLag = 
getReplicationOffsetLagForPartition(p, lastPolledRecords.records(p));
+if (replicationOffsetLag < 0) {
+log.warn("Replication offset lag for partition {} is 
negative({}) - " +
+"skipping metric reporting for this partition.", 
p, replicationOffsetLag);
+return;
+}
+metrics.replicationOffsetLag(p, replicationOffsetLag + 1); 
//+1 to account for zero-based offset numbering
+} catch (UnsupportedOperationException e) {
+log.error("Failed to calculate replication offset lag for 
partition {}.", p, e);
+}
+});
+}
+
+private long getReplicationOffsetLagForPartition(TopicPartition partition,
+ 
List> lastPolledRecordsForPartition) {
+ConsumerRecord lastPolledRecord =
+
lastPolledRecordsForPartition.get(lastPolledRecordsForPartition.size() - 1);
+if (!lastPolledRecord.topic().equals(partition.topic()) || 
lastPolledRecord.partition() != partition.partition()) {
+String error = String.format(
+"Unexpected topic/partition mismatch while calculating 
replication-offset-lag. Expected: %s, got: %s-%s.",
+partition, lastPolledRecord.topic(), 
lastPolledRecord.partition());
+throw new UnsupportedOperationException(error);
+}
+long endOffsetForPartition = lastPolledRecord.offset();

Review Comment:
   My doubts are here: I think the LEO of the partition should be the log end 
offset of the partition in the source cluster, but the 
`lastPolledRecord.offset()` here represents only the offset in the source 
cluster of the last record polled by the task on the partition, that is to say, 
maybe the log end offset of the source cluster has reached 100, but due to the 
poor consumer performance of the task, it is actually only polled to the 
position where the offset is equal to 80, so the lag must be greater than 20 
(the reason why it is greater than this is because it has just been polled 
data, not yet written to the target cluster, I think we agree on this). But if 
you follow the logic here in the PR, LEO will be 80, but in fact the offset of 
the source cluster has been written to 100.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org