hudeqi commented on code in PR #14077:
URL: https://github.com/apache/kafka/pull/14077#discussion_r1314583243
##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -282,6 +285,38 @@ SourceRecord convertRecord(ConsumerRecord
record) {
record.timestamp(), headers);
}
+//visible for testing
+void reportReplicationOffsetLag(ConsumerRecords
lastPolledRecords) {
+Set partitions = lastPolledRecords.partitions();
+partitions.forEach(p -> {
+try {
+long replicationOffsetLag =
getReplicationOffsetLagForPartition(p, lastPolledRecords.records(p));
+if (replicationOffsetLag < 0) {
+log.warn("Replication offset lag for partition {} is
negative({}) - " +
+"skipping metric reporting for this partition.",
p, replicationOffsetLag);
+return;
+}
+metrics.replicationOffsetLag(p, replicationOffsetLag + 1);
//+1 to account for zero-based offset numbering
+} catch (UnsupportedOperationException e) {
+log.error("Failed to calculate replication offset lag for
partition {}.", p, e);
+}
+});
+}
+
+private long getReplicationOffsetLagForPartition(TopicPartition partition,
+
List> lastPolledRecordsForPartition) {
+ConsumerRecord lastPolledRecord =
+
lastPolledRecordsForPartition.get(lastPolledRecordsForPartition.size() - 1);
+if (!lastPolledRecord.topic().equals(partition.topic()) ||
lastPolledRecord.partition() != partition.partition()) {
+String error = String.format(
+"Unexpected topic/partition mismatch while calculating
replication-offset-lag. Expected: %s, got: %s-%s.",
+partition, lastPolledRecord.topic(),
lastPolledRecord.partition());
+throw new UnsupportedOperationException(error);
+}
+long endOffsetForPartition = lastPolledRecord.offset();
Review Comment:
My doubts are here: I think the LEO of the partition should be the log end
offset of the partition in the source cluster, but the
`lastPolledRecord.offset()` here represents only the offset in the source
cluster of the last record polled by the task on the partition, that is to say,
maybe the log end offset of the source cluster has reached 100, but due to the
poor consumer performance of the task, it is actually only polled to the
position where the offset is equal to 80, so the lag must be greater than 20
(the reason why it is greater than this is because it has just been polled
data, not yet written to the target cluster, I think we agree on this). But if
you follow the logic here in the PR, LEO will be 80, but in fact the offset of
the source cluster has been written to 100.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org