cecemei commented on code in PR #18750:
URL: https://github.com/apache/druid/pull/18750#discussion_r2604952312
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,44 +298,48 @@ protected Map<KafkaTopicPartition, Long>
getPartitionTimeLag()
// suppress use of CollectionUtils.mapValues() since the valueMapper
function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while calculating cummulative lag for entire stream
- private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long>
currentOffsets)
+ private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences()
{
- if (latestSequenceFromStream == null) {
+ Map<KafkaTopicPartition, Long> highestIngestedOffsets =
getHighestIngestedOffsets();
+ Map<KafkaTopicPartition, Long> latestSequencesFromStream =
getLatestSequencesFromStream();
+
+ if (latestSequencesFromStream.isEmpty()) {
return Collections.emptyMap();
}
- return latestSequenceFromStream
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- Entry::getKey,
- e -> e.getValue() != null
- ? e.getValue() -
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
- : 0
- )
- );
+ return latestSequencesFromStream.entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e ->
+ e.getValue() -
highestIngestedOffsets.getOrDefault(e.getKey(), 0L)
+ )
+ );
}
+ // This function is defined and called by the parent class to compute the
lag for specific partitions.
Review Comment:
a few notes:
- please use javadoc style documentation for method.
- worth noting that the offset in `currentOffsets` is ignored since we adopt
the `OffsetSnapshot` approach. this means the `currentOffsets` is no longer
used in the lag calculation in the `TaskReportData`.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,44 +298,48 @@ protected Map<KafkaTopicPartition, Long>
getPartitionTimeLag()
// suppress use of CollectionUtils.mapValues() since the valueMapper
function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while calculating cummulative lag for entire stream
- private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long>
currentOffsets)
+ private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences()
{
- if (latestSequenceFromStream == null) {
+ Map<KafkaTopicPartition, Long> highestIngestedOffsets =
getHighestIngestedOffsets();
+ Map<KafkaTopicPartition, Long> latestSequencesFromStream =
getLatestSequencesFromStream();
+
+ if (latestSequencesFromStream.isEmpty()) {
return Collections.emptyMap();
}
- return latestSequenceFromStream
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- Entry::getKey,
- e -> e.getValue() != null
- ? e.getValue() -
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
- : 0
- )
- );
+ return latestSequencesFromStream.entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e ->
+ e.getValue() -
highestIngestedOffsets.getOrDefault(e.getKey(), 0L)
+ )
+ );
}
+ // This function is defined and called by the parent class to compute the
lag for specific partitions.
+ // The `currentOffsets` parameter is provided by the parent class and
indicates the partitions to query.
+ // Note: This function differs from
`getRecordLagPerPartitionInLatestSequences()`:
+ // `getRecordLagPerPartitionInLatestSequences()` queries lag for all
partitions,
+ // whereas `getRecordLagPerPartition()` queries lag only for the
specified partitions.
@Override
protected Map<KafkaTopicPartition, Long>
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
{
- if (latestSequenceFromStream == null || currentOffsets == null) {
+ Map<KafkaTopicPartition, Long> latestSequencesFromStream =
getLatestSequencesFromStream();
Review Comment:
same here, dont break atomicity:
```
OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot =
offsetSnapshotRef.get();
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshot.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
Review Comment:
A few notes:
- use `<p>` as paragraph tag
- there's no need to include too much details on previous behavior or link
the pr usually, as the documentation should be forward-looking. Optionally you
could add a release note section in your PR to document this change. For the
javadoc, maybe just could say like:
>The supervisor fetches task-reported ingested offsets first, then fetches
end offsets from the stream. Because these two values are captured at different
instants, the reported delay (latestOffsetsFromStream - highestIngestedOffsets)
may be slightly larger than the actual delay at any precise moment. However,
publishing both maps together as a single atomic snapshot ensures that readers
always observe a consistent view, producing stable and monotonic delay trends.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,44 +298,48 @@ protected Map<KafkaTopicPartition, Long>
getPartitionTimeLag()
// suppress use of CollectionUtils.mapValues() since the valueMapper
function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while calculating cummulative lag for entire stream
- private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long>
currentOffsets)
+ private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences()
{
- if (latestSequenceFromStream == null) {
+ Map<KafkaTopicPartition, Long> highestIngestedOffsets =
getHighestIngestedOffsets();
+ Map<KafkaTopicPartition, Long> latestSequencesFromStream =
getLatestSequencesFromStream();
Review Comment:
dont call for the offsets separately, it's breaking the atomicity, instead:
pass in `OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot` as param.
```
offsetSnapshot.getLatestOffsetsFromStream()
.entrySet()
.stream()
.collect(Collectors.toMap(
Entry::getKey,
e -> e.getValue() -
offsetSnapshot.getHighestIngestedOffsets().getOrDefault(e.getKey(), 0L)
));
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,44 +298,48 @@ protected Map<KafkaTopicPartition, Long>
getPartitionTimeLag()
// suppress use of CollectionUtils.mapValues() since the valueMapper
function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while calculating cummulative lag for entire stream
- private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long>
currentOffsets)
+ private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences()
{
- if (latestSequenceFromStream == null) {
+ Map<KafkaTopicPartition, Long> highestIngestedOffsets =
getHighestIngestedOffsets();
+ Map<KafkaTopicPartition, Long> latestSequencesFromStream =
getLatestSequencesFromStream();
+
+ if (latestSequencesFromStream.isEmpty()) {
return Collections.emptyMap();
}
- return latestSequenceFromStream
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- Entry::getKey,
- e -> e.getValue() != null
- ? e.getValue() -
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
- : 0
- )
- );
+ return latestSequencesFromStream.entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e ->
+ e.getValue() -
highestIngestedOffsets.getOrDefault(e.getKey(), 0L)
+ )
+ );
}
+ // This function is defined and called by the parent class to compute the
lag for specific partitions.
+ // The `currentOffsets` parameter is provided by the parent class and
indicates the partitions to query.
+ // Note: This function differs from
`getRecordLagPerPartitionInLatestSequences()`:
+ // `getRecordLagPerPartitionInLatestSequences()` queries lag for all
partitions,
+ // whereas `getRecordLagPerPartition()` queries lag only for the
specified partitions.
@Override
protected Map<KafkaTopicPartition, Long>
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
{
- if (latestSequenceFromStream == null || currentOffsets == null) {
+ Map<KafkaTopicPartition, Long> latestSequencesFromStream =
getLatestSequencesFromStream();
+ Map<KafkaTopicPartition, Long> highestIngestedOffsets =
getHighestIngestedOffsets();
+
+ if (latestSequencesFromStream.isEmpty() ||
highestIngestedOffsets.isEmpty() || currentOffsets == null) {
Review Comment:
`highestIngestedOffsets` can be empty right? meaning ingestion hasnt start
so offset default to 0.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -535,10 +549,24 @@ protected void updatePartitionLagFromStream()
}
}
+ private void updateOffsetSnapshot(
+ Map<KafkaTopicPartition, Long> highestIngestedOffsets,
+ Map<KafkaTopicPartition, Long> latestOffsetsFromStream
+ )
+ {
+ offsetSnapshotRef.set(
+ OffsetSnapshot.of(highestIngestedOffsets, latestOffsetsFromStream));
+ }
+
@Override
protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
{
- return latestSequenceFromStream != null ? latestSequenceFromStream : new
HashMap<>();
+ return offsetSnapshotRef.get().getLatestOffsetsFromStream();
+ }
+
+ private Map<KafkaTopicPartition, Long> getHighestIngestedOffsets()
Review Comment:
the method `getLatestSequencesFromStream` and `getHighestIngestedOffsets`
breaks the atomicity of the `OffsetSnapshot` class. imagine, you called for
`getHighestIngestedOffsets` then the `offsetSnapshotRef` got updated, then you
call for `getLatestSequencesFromStream`, now you're literally comparing offsets
at different times!
it's okie to keep `getLatestSequencesFromStream` for compatibility, but
please try to not use it in kafkaSupervisor, and in all places, just call
`offsetSnapshotRef.get()` for the offset directly.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -172,15 +175,15 @@ protected
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
- Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+ Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences();
return new KafkaSupervisorReportPayload(
spec.getId(),
spec.getDataSchema().getDataSource(),
ioConfig.getStream(),
numPartitions,
ioConfig.getReplicas(),
ioConfig.getTaskDuration().getMillis() / 1000,
- includeOffsets ? latestSequenceFromStream : null,
+ includeOffsets ? getLatestSequencesFromStream() : null,
Review Comment:
this sort of breaks the atomicity, please make sure the same
`OffsetSnapshot<KafkaTopicPartition, Long>` is used, i.e. pass
`OffsetSnapshot<KafkaTopicPartition, Long>` as param to
`getRecordLagPerPartitionInLatestSequences`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]