Copilot commented on code in PR #18750: URL: https://github.com/apache/druid/pull/18750#discussion_r2756867455
########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class OffsetSnapshotTest +{ + @Test + public void testOffsetSnapshot_emptyInputReturnsEmptyMap() + { + OffsetSnapshot<String, Long> snapshot = OffsetSnapshot.of( + Collections.emptyMap(), + Collections.emptyMap() + ); + + Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets()); + Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getLatestOffsetsFromStream()); + } + + @Test + public void testOffsetSnapshot_nullInputsReturnEmptyMaps() + { + OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, null); + + Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets()); + Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getLatestOffsetsFromStream()); + } + + @Test + public void testOffsetSnapshot_nullCurrentOffsetsReturnsEmptyCurrentMap() + { + Map<Integer, Long> endOffsets = ImmutableMap.of(0, 100L, 1, 200L); + + OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, endOffsets); + + Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets()); + Assert.assertEquals(endOffsets, snapshot.getLatestOffsetsFromStream()); Review Comment: Same issue here: `assertSame(ImmutableMap.of(), ...)` depends on reference identity. Prefer `assertEquals(ImmutableMap.of(), ...)` (and/or `isEmpty`) instead. ########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class OffsetSnapshotTest +{ + @Test + public void testOffsetSnapshot_emptyInputReturnsEmptyMap() + { + OffsetSnapshot<String, Long> snapshot = OffsetSnapshot.of( + Collections.emptyMap(), + Collections.emptyMap() + ); + + Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets()); + Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getLatestOffsetsFromStream()); + } + + @Test + public void testOffsetSnapshot_nullInputsReturnEmptyMaps() + { + OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, null); + + Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets()); + Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getLatestOffsetsFromStream()); + } + + @Test + public void testOffsetSnapshot_nullCurrentOffsetsReturnsEmptyCurrentMap() + { + Map<Integer, Long> endOffsets = ImmutableMap.of(0, 100L, 1, 200L); + + OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, endOffsets); + + Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets()); + Assert.assertEquals(endOffsets, snapshot.getLatestOffsetsFromStream()); + } + + @Test + public void testOffsetSnapshot_nullEndOffsetsReturnsEmptyEndMap() + { + Map<Integer, Long> currentOffsets = ImmutableMap.of(0, 50L, 1, 150L); + + OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(currentOffsets, null); + + Assert.assertEquals(currentOffsets, snapshot.getHighestIngestedOffsets()); + Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getLatestOffsetsFromStream()); + } Review Comment: Same issue here: using `assertSame` against `ImmutableMap.of()` is a brittle identity check; use `assertEquals`/`isEmpty` instead. ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -506,6 +518,8 @@ protected void updatePartitionLagFromStream() return; } + Map<KafkaTopicPartition, Long> highestCurrentOffsets = getHighestCurrentOffsets(); + Review Comment: The Javadoc immediately above `updatePartitionLagFromStream()` still mentions updating `latestSequenceFromStream`, but that field was removed in favor of the atomic `OffsetSnapshot`/`offsetSnapshotRef`. Please update the comment to avoid pointing readers to a nonexistent symbol and to reflect the new snapshot-based flow. ########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -294,44 +300,48 @@ protected Map<KafkaTopicPartition, Long> getPartitionTimeLag() // suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here @SuppressWarnings("SSBasedInspection") // Used while calculating cummulative lag for entire stream - private Map<KafkaTopicPartition, Long> getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> currentOffsets) + private Map<KafkaTopicPartition, Long> getRecordLagPerPartitionInLatestSequences(OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot) { - if (latestSequenceFromStream == null) { - return Collections.emptyMap(); - } - - return latestSequenceFromStream - .entrySet() - .stream() - .collect( - Collectors.toMap( - Entry::getKey, - e -> e.getValue() != null - ? e.getValue() - Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L) - : 0 - ) - ); + return offsetSnapshot.getLatestOffsetsFromStream() + .entrySet() + .stream() + .collect(Collectors.toMap( + Entry::getKey, + e -> e.getValue() - offsetSnapshot.getHighestIngestedOffsets().getOrDefault(e.getKey(), 0L) + )); } + /** + * Computes the record lag for the specified partitions. + * <p> + * This method is called by the parent class to calculate lag only for the partitions present in + * {@code currentOffsets}. The values in {@code currentOffsets} are ignored; only the keys (partitions) + * are used to determine which partitions to compute lag for. + * <p> + * Lag is calculated as {@code latestOffsetFromStream - highestIngestedOffset} using the current atomic + * {@link OffsetSnapshot} to ensure consistency between the two offset maps. + * + * @param currentOffsets map whose keys indicate the partitions to compute lag for (values are ignored) + * @return map of partition to its record lag, or empty map if no valid data is available + */ @Override protected Map<KafkaTopicPartition, Long> getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets) { - if (latestSequenceFromStream == null || currentOffsets == null) { + OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot = offsetSnapshotRef.get(); + Map<KafkaTopicPartition, Long> latestSequencesFromStream = offsetSnapshot.getLatestOffsetsFromStream(); + Map<KafkaTopicPartition, Long> highestIngestedOffsets = offsetSnapshot.getHighestIngestedOffsets(); + + if (latestSequencesFromStream.isEmpty() || currentOffsets == null) { return Collections.emptyMap(); } - return currentOffsets - .entrySet() - .stream() - .filter(e -> latestSequenceFromStream.get(e.getKey()) != null) - .collect( - Collectors.toMap( - Entry::getKey, - e -> e.getValue() != null - ? latestSequenceFromStream.get(e.getKey()) - e.getValue() - : 0 - ) - ); + return currentOffsets.keySet().stream() + .filter(latestSequencesFromStream::containsKey) + .collect(Collectors.toMap( + Function.identity(), + // compute the lag using offsets from the snapshot. + p -> latestSequencesFromStream.get(p) - highestIngestedOffsets.getOrDefault(p, 0L) + )); Review Comment: `getRecordLagPerPartition` is called by `SeekableStreamSupervisor.generateReport` to compute per-task lag (based on each task's `currentOffsets` values). This implementation ignores those values and instead subtracts the snapshot's *highest* ingested offsets, which can make every task report the same lag and hide lagging replicas. Compute lag using the provided `currentOffsets` values (still using the snapshot only for the latest stream offsets) and update the Javadoc accordingly (including null-value handling to preserve previous behavior). ########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class OffsetSnapshotTest +{ + @Test + public void testOffsetSnapshot_emptyInputReturnsEmptyMap() + { + OffsetSnapshot<String, Long> snapshot = OffsetSnapshot.of( + Collections.emptyMap(), + Collections.emptyMap() + ); + + Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets()); + Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getLatestOffsetsFromStream()); Review Comment: These assertions rely on reference equality with `ImmutableMap.of()`. Even if it happens to be true today, it’s not part of the contract of `ImmutableMap.copyOf(...)` and makes the test brittle across Guava versions/implementations. Prefer `assertEquals(ImmutableMap.of(), ...)` and/or `assertTrue(...isEmpty())`. ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshot.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.Objects; + +/** + * Immutable snapshot containing a consistent pair of offset maps: the highest ingested offsets + * reported by tasks and the latest end offsets fetched from the underlying stream. + * + * <p> + * The supervisor fetches task-reported ingested offsets first, then fetches end offsets from the stream. + * Because these two values are captured at slightly different instants, the reported lag + * (latestOffsetsFromStream - highestIngestedOffsets) may be slightly larger than the actual lag at any + * precise moment. + * + * <p> + * By publishing both maps together as a single atomic snapshot (using {@link java.util.concurrent.atomic.AtomicReference}), + * readers (such as lag metrics and supervisor status) always observe a coherent and consistent view. + * This produces stable and monotonic lag trends, avoiding artifacts like temporary negative lags. Review Comment: The class Javadoc claims this “produces stable and monotonic lag trends”, but lag is not monotonic in general (it can increase when new records arrive and decrease when ingestion catches up). Consider rewording to focus on “consistent/non-negative snapshots” and avoiding transient negative artifacts, without implying monotonic behavior. ```suggestion * This provides consistent, non-negative lag snapshots and avoids transient artifacts such as temporary negative lags, * without implying that lag values themselves are monotonic over time. ``` ########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class OffsetSnapshotTest +{ + @Test + public void testOffsetSnapshot_emptyInputReturnsEmptyMap() + { + OffsetSnapshot<String, Long> snapshot = OffsetSnapshot.of( + Collections.emptyMap(), + Collections.emptyMap() + ); + + Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets()); + Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getLatestOffsetsFromStream()); + } + + @Test + public void testOffsetSnapshot_nullInputsReturnEmptyMaps() + { + OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, null); + + Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getHighestIngestedOffsets()); + Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty()); + Assert.assertSame(ImmutableMap.of(), snapshot.getLatestOffsetsFromStream()); + } Review Comment: Same as above: relying on `assertSame(ImmutableMap.of(), ...)` makes the test brittle since reference identity for empty `ImmutableMap` instances isn’t guaranteed by the API contract. Prefer `assertEquals`/`isEmpty` checks here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
