cecemei commented on code in PR #18750:
URL: https://github.com/apache/druid/pull/18750#discussion_r2604952312


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,44 +298,48 @@ protected Map<KafkaTopicPartition, Long> 
getPartitionTimeLag()
   // suppress use of CollectionUtils.mapValues() since the valueMapper 
function is dependent on map key here
   @SuppressWarnings("SSBasedInspection")
   // Used while calculating cummulative lag for entire stream
-  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> 
currentOffsets)
+  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences()
   {
-    if (latestSequenceFromStream == null) {
+    Map<KafkaTopicPartition, Long> highestIngestedOffsets = 
getHighestIngestedOffsets();
+    Map<KafkaTopicPartition, Long> latestSequencesFromStream = 
getLatestSequencesFromStream();
+
+    if (latestSequencesFromStream.isEmpty()) {
       return Collections.emptyMap();
     }
 
-    return latestSequenceFromStream
-        .entrySet()
-        .stream()
-        .collect(
-            Collectors.toMap(
-                Entry::getKey,
-                e -> e.getValue() != null
-                     ? e.getValue() - 
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
-                     : 0
-            )
-        );
+    return latestSequencesFromStream.entrySet()
+                                    .stream()
+                                    .collect(
+                                        Collectors.toMap(
+                                            Entry::getKey,
+                                            e ->
+                                                e.getValue() - 
highestIngestedOffsets.getOrDefault(e.getKey(), 0L)
+                                        )
+                                    );
   }
 
+  // This function is defined and called by the parent class to compute the 
lag for specific partitions.

Review Comment:
   a few notes:
   - please use javadoc style documentation for method.
   - worth noting that the offset in `currentOffsets` is ignored since we adopt 
the `OffsetSnapshot` approach. this means the `currentOffsets` is no longer 
used in the lag calculation in the `TaskReportData`. 



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,44 +298,48 @@ protected Map<KafkaTopicPartition, Long> 
getPartitionTimeLag()
   // suppress use of CollectionUtils.mapValues() since the valueMapper 
function is dependent on map key here
   @SuppressWarnings("SSBasedInspection")
   // Used while calculating cummulative lag for entire stream
-  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> 
currentOffsets)
+  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences()
   {
-    if (latestSequenceFromStream == null) {
+    Map<KafkaTopicPartition, Long> highestIngestedOffsets = 
getHighestIngestedOffsets();
+    Map<KafkaTopicPartition, Long> latestSequencesFromStream = 
getLatestSequencesFromStream();
+
+    if (latestSequencesFromStream.isEmpty()) {
       return Collections.emptyMap();
     }
 
-    return latestSequenceFromStream
-        .entrySet()
-        .stream()
-        .collect(
-            Collectors.toMap(
-                Entry::getKey,
-                e -> e.getValue() != null
-                     ? e.getValue() - 
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
-                     : 0
-            )
-        );
+    return latestSequencesFromStream.entrySet()
+                                    .stream()
+                                    .collect(
+                                        Collectors.toMap(
+                                            Entry::getKey,
+                                            e ->
+                                                e.getValue() - 
highestIngestedOffsets.getOrDefault(e.getKey(), 0L)
+                                        )
+                                    );
   }
 
+  // This function is defined and called by the parent class to compute the 
lag for specific partitions.
+  // The `currentOffsets` parameter is provided by the parent class and 
indicates the partitions to query.
+  // Note: This function differs from 
`getRecordLagPerPartitionInLatestSequences()`:
+  //      `getRecordLagPerPartitionInLatestSequences()` queries lag for all 
partitions,
+  //      whereas `getRecordLagPerPartition()` queries lag only for the 
specified partitions.
   @Override
   protected Map<KafkaTopicPartition, Long> 
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
   {
-    if (latestSequenceFromStream == null || currentOffsets == null) {
+    Map<KafkaTopicPartition, Long> latestSequencesFromStream = 
getLatestSequencesFromStream();

Review Comment:
   same here, dont break atomicity:
   
   ```
   OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot = 
offsetSnapshotRef.get();
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshot.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+/**

Review Comment:
   A few notes:
   - use `<p>` as paragraph tag
   - there's no need to include too much details on previous behavior or link 
the pr usually, as the documentation should be forward-looking. Optionally you 
could add a release note section in your PR to document this change.  For the 
javadoc,  maybe just could say like:
   >The supervisor fetches task-reported ingested offsets first, then fetches 
end offsets from the stream. Because these two values are captured at different 
instants, the reported delay (latestOffsetsFromStream - highestIngestedOffsets) 
may be slightly larger than the actual delay at any precise moment. However, 
publishing both maps together as a single atomic snapshot ensures that readers 
always observe a consistent view, producing stable and monotonic delay trends. 



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,44 +298,48 @@ protected Map<KafkaTopicPartition, Long> 
getPartitionTimeLag()
   // suppress use of CollectionUtils.mapValues() since the valueMapper 
function is dependent on map key here
   @SuppressWarnings("SSBasedInspection")
   // Used while calculating cummulative lag for entire stream
-  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> 
currentOffsets)
+  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences()
   {
-    if (latestSequenceFromStream == null) {
+    Map<KafkaTopicPartition, Long> highestIngestedOffsets = 
getHighestIngestedOffsets();
+    Map<KafkaTopicPartition, Long> latestSequencesFromStream = 
getLatestSequencesFromStream();

Review Comment:
   dont call for the offsets separately, it's breaking the atomicity, instead:
   
   pass in `OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot` as param.
   ```
   offsetSnapshot.getLatestOffsetsFromStream()
                     .entrySet()
                     .stream()
                     .collect(Collectors.toMap(
                         Entry::getKey,
                         e -> e.getValue() - 
offsetSnapshot.getHighestIngestedOffsets().getOrDefault(e.getKey(), 0L)
                     ));
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,44 +298,48 @@ protected Map<KafkaTopicPartition, Long> 
getPartitionTimeLag()
   // suppress use of CollectionUtils.mapValues() since the valueMapper 
function is dependent on map key here
   @SuppressWarnings("SSBasedInspection")
   // Used while calculating cummulative lag for entire stream
-  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> 
currentOffsets)
+  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences()
   {
-    if (latestSequenceFromStream == null) {
+    Map<KafkaTopicPartition, Long> highestIngestedOffsets = 
getHighestIngestedOffsets();
+    Map<KafkaTopicPartition, Long> latestSequencesFromStream = 
getLatestSequencesFromStream();
+
+    if (latestSequencesFromStream.isEmpty()) {
       return Collections.emptyMap();
     }
 
-    return latestSequenceFromStream
-        .entrySet()
-        .stream()
-        .collect(
-            Collectors.toMap(
-                Entry::getKey,
-                e -> e.getValue() != null
-                     ? e.getValue() - 
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
-                     : 0
-            )
-        );
+    return latestSequencesFromStream.entrySet()
+                                    .stream()
+                                    .collect(
+                                        Collectors.toMap(
+                                            Entry::getKey,
+                                            e ->
+                                                e.getValue() - 
highestIngestedOffsets.getOrDefault(e.getKey(), 0L)
+                                        )
+                                    );
   }
 
+  // This function is defined and called by the parent class to compute the 
lag for specific partitions.
+  // The `currentOffsets` parameter is provided by the parent class and 
indicates the partitions to query.
+  // Note: This function differs from 
`getRecordLagPerPartitionInLatestSequences()`:
+  //      `getRecordLagPerPartitionInLatestSequences()` queries lag for all 
partitions,
+  //      whereas `getRecordLagPerPartition()` queries lag only for the 
specified partitions.
   @Override
   protected Map<KafkaTopicPartition, Long> 
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
   {
-    if (latestSequenceFromStream == null || currentOffsets == null) {
+    Map<KafkaTopicPartition, Long> latestSequencesFromStream = 
getLatestSequencesFromStream();
+    Map<KafkaTopicPartition, Long> highestIngestedOffsets = 
getHighestIngestedOffsets();
+
+    if (latestSequencesFromStream.isEmpty() || 
highestIngestedOffsets.isEmpty() || currentOffsets == null) {

Review Comment:
   `highestIngestedOffsets` can be empty right? meaning ingestion hasnt start 
so offset default to 0. 



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -535,10 +549,24 @@ protected void updatePartitionLagFromStream()
     }
   }
 
+  private void updateOffsetSnapshot(
+      Map<KafkaTopicPartition, Long> highestIngestedOffsets,
+      Map<KafkaTopicPartition, Long> latestOffsetsFromStream
+  )
+  {
+    offsetSnapshotRef.set(
+        OffsetSnapshot.of(highestIngestedOffsets, latestOffsetsFromStream));
+  }
+
   @Override
   protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
   {
-    return latestSequenceFromStream != null ? latestSequenceFromStream : new 
HashMap<>();
+    return offsetSnapshotRef.get().getLatestOffsetsFromStream();
+  }
+
+  private Map<KafkaTopicPartition, Long> getHighestIngestedOffsets()

Review Comment:
   the method `getLatestSequencesFromStream` and `getHighestIngestedOffsets` 
breaks the atomicity of the `OffsetSnapshot` class. imagine, you called for 
`getHighestIngestedOffsets` then the `offsetSnapshotRef` got updated, then you 
call for `getLatestSequencesFromStream`, now you're literally comparing offsets 
at different times! 
   
   it's okie to keep `getLatestSequencesFromStream` for compatibility, but 
please try to not use it in kafkaSupervisor, and in all places, just call 
`offsetSnapshotRef.get()` for the offset directly.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -172,15 +175,15 @@ protected 
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
   )
   {
     KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
-    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences();
     return new KafkaSupervisorReportPayload(
         spec.getId(),
         spec.getDataSchema().getDataSource(),
         ioConfig.getStream(),
         numPartitions,
         ioConfig.getReplicas(),
         ioConfig.getTaskDuration().getMillis() / 1000,
-        includeOffsets ? latestSequenceFromStream : null,
+        includeOffsets ? getLatestSequencesFromStream() : null,

Review Comment:
   this sort of breaks the atomicity, please make sure the same 
`OffsetSnapshot<KafkaTopicPartition, Long>` is used, i.e. pass 
`OffsetSnapshot<KafkaTopicPartition, Long>` as param to 
`getRecordLagPerPartitionInLatestSequences`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to