kfaraz commented on code in PR #18750:
URL: https://github.com/apache/druid/pull/18750#discussion_r2589875135


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // Internal data structures
   // --------------------------------------------------------
 
+  protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+  {
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> 
currentOffsets;
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+    public OffsetSnapshot(
+        @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+        @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets

Review Comment:
   Please use the field names `latestOffsetsFromStream`, 
`highestIngestedOffsets` to avoid confusion. `currentOffsets` and `endOffsets` 
are too similar.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // Internal data structures
   // --------------------------------------------------------
 
+  protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+  {
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> 
currentOffsets;
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+    public OffsetSnapshot(
+        @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+        @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+    )
+    {
+      this.currentOffsets = toImmutableOffsetMap(currentOffsets);
+      this.endOffsets = toImmutableOffsetMap(endOffsets);
+    }
+
+    private ImmutableMap<PartitionIdType, SequenceOffsetType> 
toImmutableOffsetMap(
+        @Nullable Map<PartitionIdType, SequenceOffsetType> input
+    )
+    {
+      if (input == null || input.isEmpty()) {
+        return ImmutableMap.of();
+      }
+
+      return input.entrySet().stream()
+                  .filter(e -> e.getValue() != null)
+                  .collect(ImmutableMap.toImmutableMap(
+                      Map.Entry::getKey,
+                      Map.Entry::getValue
+                  ));
+    }
+
+    public ImmutableMap<PartitionIdType, SequenceOffsetType> 
getCurrentOffsets()
+    {
+      return currentOffsets;
+    }
+
+    public ImmutableMap<PartitionIdType, SequenceOffsetType> getEndOffsets()
+    {
+      return endOffsets;
+    }
+  }
+
+  protected final AtomicReference<OffsetSnapshot<PartitionIdType, 
SequenceOffsetType>> offsetSnapshotRef =
+      new AtomicReference<>(new OffsetSnapshot<>(Collections.emptyMap(), 
Collections.emptyMap()));

Review Comment:
   Nit: for brevity, might be nice to have a static `of` method in 
`OffsetSnapshot`.
   
   for example:
   ```suggestion
         new AtomicReference<>(OffsetSnapshot.of(Map.of(), Map.of()));
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // Internal data structures
   // --------------------------------------------------------
 
+  protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>

Review Comment:
   Please put this class in a separate file of its own since the supervisor 
class is already too bloated.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -172,15 +172,15 @@ protected 
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
   )
   {
     KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
-    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences();
     return new KafkaSupervisorReportPayload(
         spec.getId(),
         spec.getDataSchema().getDataSource(),
         ioConfig.getStream(),
         numPartitions,
         ioConfig.getReplicas(),
         ioConfig.getTaskDuration().getMillis() / 1000,
-        includeOffsets ? latestSequenceFromStream : null,
+        includeOffsets ? offsetSnapshotRef.get().getEndOffsets() : null,

Review Comment:
   +1



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // Internal data structures
   // --------------------------------------------------------
 
+  protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>

Review Comment:
   +1 to the javadocs



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // Internal data structures
   // --------------------------------------------------------
 
+  protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+  {
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> 
currentOffsets;
+    private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+    public OffsetSnapshot(
+        @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+        @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+    )
+    {
+      this.currentOffsets = toImmutableOffsetMap(currentOffsets);
+      this.endOffsets = toImmutableOffsetMap(endOffsets);
+    }
+
+    private ImmutableMap<PartitionIdType, SequenceOffsetType> 
toImmutableOffsetMap(
+        @Nullable Map<PartitionIdType, SequenceOffsetType> input
+    )
+    {
+      if (input == null || input.isEmpty()) {
+        return ImmutableMap.of();
+      }
+
+      return input.entrySet().stream()
+                  .filter(e -> e.getValue() != null)
+                  .collect(ImmutableMap.toImmutableMap(
+                      Map.Entry::getKey,
+                      Map.Entry::getValue
+                  ));
+    }
+
+    public ImmutableMap<PartitionIdType, SequenceOffsetType> 
getCurrentOffsets()
+    {
+      return currentOffsets;
+    }
+
+    public ImmutableMap<PartitionIdType, SequenceOffsetType> getEndOffsets()
+    {
+      return endOffsets;
+    }
+  }
+
+  protected final AtomicReference<OffsetSnapshot<PartitionIdType, 
SequenceOffsetType>> offsetSnapshotRef =

Review Comment:
   Let's move this field to `KafkaSupervisor` and use it only for Kafka for the 
time being.
   If it works fine there, we will adopt it later for the other supervisors too.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -454,6 +461,12 @@ private void updatePartitionTimeAndRecordLagFromStream()
                   e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
               )
           );
+
+      OffsetSnapshot<KafkaTopicPartition, Long> snapshot = new 
OffsetSnapshot<>(
+          highestCurrentOffsets,
+          latestSequenceFromStream
+      );
+      offsetSnapshotRef.set(snapshot);

Review Comment:
   Nit: can be in a single statement
   
   ```suggestion
         offsetSnapshotRef.set(
             OffsetSnapshot.of(highestCurrentOffsets, latestSequenceFromStream)
          );
   ```



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -526,6 +539,12 @@ protected void updatePartitionLagFromStream()
 
       latestSequenceFromStream =
           
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, 
recordSupplier::getPosition));
+
+      OffsetSnapshot<KafkaTopicPartition, Long> snapshot = new 
OffsetSnapshot<>(

Review Comment:
   +1
   
   After the `latestSequenceFromStream` has been fetched, another thread could 
have updated the metadata store with the latest offsets which are returned from 
the `getHighestCurrentOffsets()`.
   
   If you just make the change as @cecemei suggests, I think we could ensure 
positive lag in this case as well.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -172,15 +172,15 @@ protected 
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
   )
   {
     KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
-    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences();
     return new KafkaSupervisorReportPayload(
         spec.getId(),
         spec.getDataSchema().getDataSource(),
         ioConfig.getStream(),
         numPartitions,
         ioConfig.getReplicas(),
         ioConfig.getTaskDuration().getMillis() / 1000,
-        includeOffsets ? latestSequenceFromStream : null,
+        includeOffsets ? offsetSnapshotRef.get().getEndOffsets() : null,

Review Comment:
   re-use method instead
   ```suggestion
           includeOffsets ? getLatestSequencesFromStream() : null,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to