Copilot commented on code in PR #18750:
URL: https://github.com/apache/druid/pull/18750#discussion_r2756867455


##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OffsetSnapshotTest
+{
+  @Test
+  public void testOffsetSnapshot_emptyInputReturnsEmptyMap()
+  {
+    OffsetSnapshot<String, Long> snapshot = OffsetSnapshot.of(
+        Collections.emptyMap(),
+        Collections.emptyMap()
+    );
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_nullInputsReturnEmptyMaps()
+  {
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, null);
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_nullCurrentOffsetsReturnsEmptyCurrentMap()
+  {
+    Map<Integer, Long> endOffsets = ImmutableMap.of(0, 100L, 1, 200L);
+
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, 
endOffsets);
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets());
+    Assert.assertEquals(endOffsets, snapshot.getLatestOffsetsFromStream());

Review Comment:
   Same issue here: `assertSame(ImmutableMap.of(), ...)` depends on reference 
identity. Prefer `assertEquals(ImmutableMap.of(), ...)` (and/or `isEmpty`) 
instead.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OffsetSnapshotTest
+{
+  @Test
+  public void testOffsetSnapshot_emptyInputReturnsEmptyMap()
+  {
+    OffsetSnapshot<String, Long> snapshot = OffsetSnapshot.of(
+        Collections.emptyMap(),
+        Collections.emptyMap()
+    );
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_nullInputsReturnEmptyMaps()
+  {
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, null);
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_nullCurrentOffsetsReturnsEmptyCurrentMap()
+  {
+    Map<Integer, Long> endOffsets = ImmutableMap.of(0, 100L, 1, 200L);
+
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, 
endOffsets);
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets());
+    Assert.assertEquals(endOffsets, snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_nullEndOffsetsReturnsEmptyEndMap()
+  {
+    Map<Integer, Long> currentOffsets = ImmutableMap.of(0, 50L, 1, 150L);
+
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(currentOffsets, 
null);
+
+    Assert.assertEquals(currentOffsets, snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());
+  }

Review Comment:
   Same issue here: using `assertSame` against `ImmutableMap.of()` is a brittle 
identity check; use `assertEquals`/`isEmpty` instead.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -506,6 +518,8 @@ protected void updatePartitionLagFromStream()
       return;
     }
 
+    Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+

Review Comment:
   The Javadoc immediately above `updatePartitionLagFromStream()` still 
mentions updating `latestSequenceFromStream`, but that field was removed in 
favor of the atomic `OffsetSnapshot`/`offsetSnapshotRef`. Please update the 
comment to avoid pointing readers to a nonexistent symbol and to reflect the 
new snapshot-based flow.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,44 +300,48 @@ protected Map<KafkaTopicPartition, Long> 
getPartitionTimeLag()
   // suppress use of CollectionUtils.mapValues() since the valueMapper 
function is dependent on map key here
   @SuppressWarnings("SSBasedInspection")
   // Used while calculating cummulative lag for entire stream
-  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> 
currentOffsets)
+  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences(OffsetSnapshot<KafkaTopicPartition, 
Long> offsetSnapshot)
   {
-    if (latestSequenceFromStream == null) {
-      return Collections.emptyMap();
-    }
-
-    return latestSequenceFromStream
-        .entrySet()
-        .stream()
-        .collect(
-            Collectors.toMap(
-                Entry::getKey,
-                e -> e.getValue() != null
-                     ? e.getValue() - 
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
-                     : 0
-            )
-        );
+    return offsetSnapshot.getLatestOffsetsFromStream()
+                         .entrySet()
+                         .stream()
+                         .collect(Collectors.toMap(
+                             Entry::getKey,
+                             e -> e.getValue() - 
offsetSnapshot.getHighestIngestedOffsets().getOrDefault(e.getKey(), 0L)
+                         ));
   }
 
+  /**
+   * Computes the record lag for the specified partitions.
+   * <p>
+   * This method is called by the parent class to calculate lag only for the 
partitions present in
+   * {@code currentOffsets}. The values in {@code currentOffsets} are ignored; 
only the keys (partitions)
+   * are used to determine which partitions to compute lag for.
+   * <p>
+   * Lag is calculated as {@code latestOffsetFromStream - 
highestIngestedOffset} using the current atomic
+   * {@link OffsetSnapshot} to ensure consistency between the two offset maps.
+   *
+   * @param currentOffsets map whose keys indicate the partitions to compute 
lag for (values are ignored)
+   * @return map of partition to its record lag, or empty map if no valid data 
is available
+   */
   @Override
   protected Map<KafkaTopicPartition, Long> 
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
   {
-    if (latestSequenceFromStream == null || currentOffsets == null) {
+    OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot = 
offsetSnapshotRef.get();
+    Map<KafkaTopicPartition, Long> latestSequencesFromStream = 
offsetSnapshot.getLatestOffsetsFromStream();
+    Map<KafkaTopicPartition, Long> highestIngestedOffsets = 
offsetSnapshot.getHighestIngestedOffsets();
+
+    if (latestSequencesFromStream.isEmpty() || currentOffsets == null) {
       return Collections.emptyMap();
     }
 
-    return currentOffsets
-        .entrySet()
-        .stream()
-        .filter(e -> latestSequenceFromStream.get(e.getKey()) != null)
-        .collect(
-            Collectors.toMap(
-                Entry::getKey,
-                e -> e.getValue() != null
-                     ? latestSequenceFromStream.get(e.getKey()) - e.getValue()
-                     : 0
-            )
-        );
+    return currentOffsets.keySet().stream()
+                         .filter(latestSequencesFromStream::containsKey)
+                         .collect(Collectors.toMap(
+                             Function.identity(),
+                             // compute the lag using offsets from the 
snapshot.
+                             p -> latestSequencesFromStream.get(p) - 
highestIngestedOffsets.getOrDefault(p, 0L)
+                         ));

Review Comment:
   `getRecordLagPerPartition` is called by 
`SeekableStreamSupervisor.generateReport` to compute per-task lag (based on 
each task's `currentOffsets` values). This implementation ignores those values 
and instead subtracts the snapshot's *highest* ingested offsets, which can make 
every task report the same lag and hide lagging replicas. Compute lag using the 
provided `currentOffsets` values (still using the snapshot only for the latest 
stream offsets) and update the Javadoc accordingly (including null-value 
handling to preserve previous behavior).



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OffsetSnapshotTest
+{
+  @Test
+  public void testOffsetSnapshot_emptyInputReturnsEmptyMap()
+  {
+    OffsetSnapshot<String, Long> snapshot = OffsetSnapshot.of(
+        Collections.emptyMap(),
+        Collections.emptyMap()
+    );
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());

Review Comment:
   These assertions rely on reference equality with `ImmutableMap.of()`. Even 
if it happens to be true today, it’s not part of the contract of 
`ImmutableMap.copyOf(...)` and makes the test brittle across Guava 
versions/implementations. Prefer `assertEquals(ImmutableMap.of(), ...)` and/or 
`assertTrue(...isEmpty())`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshot.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Immutable snapshot containing a consistent pair of offset maps: the highest 
ingested offsets
+ * reported by tasks and the latest end offsets fetched from the underlying 
stream.
+ *
+ * <p>
+ * The supervisor fetches task-reported ingested offsets first, then fetches 
end offsets from the stream.
+ * Because these two values are captured at slightly different instants, the 
reported lag
+ * (latestOffsetsFromStream - highestIngestedOffsets) may be slightly larger 
than the actual lag at any
+ * precise moment.
+ *
+ * <p>
+ * By publishing both maps together as a single atomic snapshot (using {@link 
java.util.concurrent.atomic.AtomicReference}),
+ * readers (such as lag metrics and supervisor status) always observe a 
coherent and consistent view.
+ * This produces stable and monotonic lag trends, avoiding artifacts like 
temporary negative lags.

Review Comment:
   The class Javadoc claims this “produces stable and monotonic lag trends”, 
but lag is not monotonic in general (it can increase when new records arrive 
and decrease when ingestion catches up). Consider rewording to focus on 
“consistent/non-negative snapshots” and avoiding transient negative artifacts, 
without implying monotonic behavior.
   ```suggestion
    * This provides consistent, non-negative lag snapshots and avoids transient 
artifacts such as temporary negative lags,
    * without implying that lag values themselves are monotonic over time.
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OffsetSnapshotTest
+{
+  @Test
+  public void testOffsetSnapshot_emptyInputReturnsEmptyMap()
+  {
+    OffsetSnapshot<String, Long> snapshot = OffsetSnapshot.of(
+        Collections.emptyMap(),
+        Collections.emptyMap()
+    );
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_nullInputsReturnEmptyMaps()
+  {
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, null);
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertSame(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());
+  }

Review Comment:
   Same as above: relying on `assertSame(ImmutableMap.of(), ...)` makes the 
test brittle since reference identity for empty `ImmutableMap` instances isn’t 
guaranteed by the API contract. Prefer `assertEquals`/`isEmpty` checks here as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to