kfaraz commented on code in PR #18750:
URL: https://github.com/apache/druid/pull/18750#discussion_r2589875135
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Internal data structures
// --------------------------------------------------------
+ protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+ {
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType>
currentOffsets;
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+ public OffsetSnapshot(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+ @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
Review Comment:
Please use the field names `latestOffsetsFromStream`,
`highestIngestedOffsets` to avoid confusion. `currentOffsets` and `endOffsets`
are too similar.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Internal data structures
// --------------------------------------------------------
+ protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+ {
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType>
currentOffsets;
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+ public OffsetSnapshot(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+ @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+ )
+ {
+ this.currentOffsets = toImmutableOffsetMap(currentOffsets);
+ this.endOffsets = toImmutableOffsetMap(endOffsets);
+ }
+
+ private ImmutableMap<PartitionIdType, SequenceOffsetType>
toImmutableOffsetMap(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> input
+ )
+ {
+ if (input == null || input.isEmpty()) {
+ return ImmutableMap.of();
+ }
+
+ return input.entrySet().stream()
+ .filter(e -> e.getValue() != null)
+ .collect(ImmutableMap.toImmutableMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue
+ ));
+ }
+
+ public ImmutableMap<PartitionIdType, SequenceOffsetType>
getCurrentOffsets()
+ {
+ return currentOffsets;
+ }
+
+ public ImmutableMap<PartitionIdType, SequenceOffsetType> getEndOffsets()
+ {
+ return endOffsets;
+ }
+ }
+
+ protected final AtomicReference<OffsetSnapshot<PartitionIdType,
SequenceOffsetType>> offsetSnapshotRef =
+ new AtomicReference<>(new OffsetSnapshot<>(Collections.emptyMap(),
Collections.emptyMap()));
Review Comment:
Nit: for brevity, might be nice to have a static `of` method in
`OffsetSnapshot`.
for example:
```suggestion
new AtomicReference<>(OffsetSnapshot.of(Map.of(), Map.of()));
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Internal data structures
// --------------------------------------------------------
+ protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
Review Comment:
Please put this class in a separate file of its own since the supervisor
class is already too bloated.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -172,15 +172,15 @@ protected
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
- Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+ Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences();
return new KafkaSupervisorReportPayload(
spec.getId(),
spec.getDataSchema().getDataSource(),
ioConfig.getStream(),
numPartitions,
ioConfig.getReplicas(),
ioConfig.getTaskDuration().getMillis() / 1000,
- includeOffsets ? latestSequenceFromStream : null,
+ includeOffsets ? offsetSnapshotRef.get().getEndOffsets() : null,
Review Comment:
+1
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Internal data structures
// --------------------------------------------------------
+ protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
Review Comment:
+1 to the javadocs
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Internal data structures
// --------------------------------------------------------
+ protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+ {
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType>
currentOffsets;
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+ public OffsetSnapshot(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+ @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+ )
+ {
+ this.currentOffsets = toImmutableOffsetMap(currentOffsets);
+ this.endOffsets = toImmutableOffsetMap(endOffsets);
+ }
+
+ private ImmutableMap<PartitionIdType, SequenceOffsetType>
toImmutableOffsetMap(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> input
+ )
+ {
+ if (input == null || input.isEmpty()) {
+ return ImmutableMap.of();
+ }
+
+ return input.entrySet().stream()
+ .filter(e -> e.getValue() != null)
+ .collect(ImmutableMap.toImmutableMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue
+ ));
+ }
+
+ public ImmutableMap<PartitionIdType, SequenceOffsetType>
getCurrentOffsets()
+ {
+ return currentOffsets;
+ }
+
+ public ImmutableMap<PartitionIdType, SequenceOffsetType> getEndOffsets()
+ {
+ return endOffsets;
+ }
+ }
+
+ protected final AtomicReference<OffsetSnapshot<PartitionIdType,
SequenceOffsetType>> offsetSnapshotRef =
Review Comment:
Let's move this field to `KafkaSupervisor` and use it only for Kafka for the
time being.
If it works fine there, we will adopt it later for the other supervisors too.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -454,6 +461,12 @@ private void updatePartitionTimeAndRecordLagFromStream()
e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
)
);
+
+ OffsetSnapshot<KafkaTopicPartition, Long> snapshot = new
OffsetSnapshot<>(
+ highestCurrentOffsets,
+ latestSequenceFromStream
+ );
+ offsetSnapshotRef.set(snapshot);
Review Comment:
Nit: can be in a single statement
```suggestion
offsetSnapshotRef.set(
OffsetSnapshot.of(highestCurrentOffsets, latestSequenceFromStream)
);
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -526,6 +539,12 @@ protected void updatePartitionLagFromStream()
latestSequenceFromStream =
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId,
recordSupplier::getPosition));
+
+ OffsetSnapshot<KafkaTopicPartition, Long> snapshot = new
OffsetSnapshot<>(
Review Comment:
+1
After the `latestSequenceFromStream` has been fetched, another thread could
have updated the metadata store with the latest offsets which are returned from
the `getHighestCurrentOffsets()`.
If you just make the change as @cecemei suggests, I think we could ensure
positive lag in this case as well.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -172,15 +172,15 @@ protected
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
- Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+ Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences();
return new KafkaSupervisorReportPayload(
spec.getId(),
spec.getDataSchema().getDataSource(),
ioConfig.getStream(),
numPartitions,
ioConfig.getReplicas(),
ioConfig.getTaskDuration().getMillis() / 1000,
- includeOffsets ? latestSequenceFromStream : null,
+ includeOffsets ? offsetSnapshotRef.get().getEndOffsets() : null,
Review Comment:
re-use method instead
```suggestion
includeOffsets ? getLatestSequencesFromStream() : null,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]