kfaraz commented on code in PR #17735:
URL: https://github.com/apache/druid/pull/17735#discussion_r1964866237


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaTopicPartition.java:
##########
@@ -47,7 +47,7 @@
     KafkaTopicPartition.KafkaTopicPartitionKeySerializer.class)
 @JsonDeserialize(using = 
KafkaTopicPartition.KafkaTopicPartitionDeserializer.class, keyUsing =
     KafkaTopicPartition.KafkaTopicPartitionKeyDeserializer.class)
-public class KafkaTopicPartition
+public class KafkaTopicPartition implements Comparable<KafkaTopicPartition>

Review Comment:
   Why does this need to be a comparable? Does the new code need to sort 
partitions at any point?



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,104 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  protected void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      final Set<KafkaTopicPartition> emptyPartitions = new HashSet<>();
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey())) {
+          if (highestCurrentOffsets.get(entry.getKey()) == null || 
highestCurrentOffsets.get(entry.getKey()) == 0) {
+            emptyPartitions.add(entry.getKey());

Review Comment:
   This is not an empty partition. The tasks perhaps haven't any ingested 
record from it yet or we haven't received an update from the tasks.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java:
##########
@@ -56,13 +68,23 @@ public OrderedPartitionableRecord(
     this.partitionId = partitionId;
     this.sequenceNumber = sequenceNumber;
     this.data = data == null ? ImmutableList.of() : data;
+    this.timestamp = timestamp;
   }
 
   public String getStream()
   {
     return stream;
   }
 
+  /**
+   *
+   * @return timestamp when the event was ingested by a stream

Review Comment:
   ```suggestion
      * @return Timestamp in millis when the record was published to the 
stream, -1 if not known
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java:
##########
@@ -132,6 +133,13 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType> 
partition,
    */
   Set<PartitionIdType> getPartitionIds(String stream);
 
+  /**
+   * returns the end offsets for all the assigned partitions.
+   *
+   * @return set of partitions with their end offsets

Review Comment:
   ```suggestion
      * Returns the end offsets for all the assigned partitions.
      *
      * @return Map from Partition ID to the corresponding end offset
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java:
##########
@@ -132,6 +133,13 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType> 
partition,
    */
   Set<PartitionIdType> getPartitionIds(String stream);
 
+  /**
+   * returns the end offsets for all the assigned partitions.
+   *
+   * @return set of partitions with their end offsets
+   */
+  Map<PartitionIdType, SequenceOffsetType> 
getEndOffsets(Set<StreamPartition<PartitionIdType>> partitions);

Review Comment:
   Some suggestions:
   - make this method a `default` and `throw UnsupportedOperationException()`.
   - remove the implementation from all record suppliers except 
`KafkaRecordSupplier
   - rename method to `getLatestSequenceNumbers()` to align with the other 
methods in this interface



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +383,94 @@ public LagStats computeLagStats()
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  protected void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
highestCurrentOffsets.entrySet()) {
+        if (partitionIds.contains(entry.getKey()) && 
highestCurrentOffsets.get(entry.getKey()) != null) {
+          // since we need to consider the last arrived record at that 
sequence do a `-1`
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
+          
CollectionUtils.mapValues(getRecordPerPartitionAtCurrentOffset(partitionIds),
+                                    OrderedPartitionableRecord::getTimestamp
+          );
+
+      Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      recordSupplier.seekToLatest(partitions);
+
+      // this method isn't actually computing the lag, just fetching the 
latests offsets from the stream. This is
+      // because we currently only have record lag for kafka, which can be 
lazily computed by subtracting the highest
+      // task offsets from the latest offsets from the stream when it is needed
+      latestSequenceFromStream =
+          
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, p 
-> recordSupplier.getPosition(p)));
+
+      // .position() gives next value to read, and we need seek by -2 to get 
the current record in next poll()
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
+        recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 2);
+      }
+
+      lastestTimestampsFromStream = 
getRecordPerPartitionAtCurrentOffset(partitionIds)
+          .entrySet().stream().filter(e -> 
lastIngestedTimestamps.containsKey(e.getKey()))
+          .collect(
+              Collectors.toMap(
+                  Entry::getKey,
+                  e -> e.getValue().getTimestamp() - 
lastIngestedTimestamps.get(e.getKey())
+              )
+          );
+    }
+    catch (InterruptedException e) {
+      throw new StreamException(e);
+    }
+    finally {
+      getRecordSupplierLock().unlock();
+    }
+  }
+
+  private Map<KafkaTopicPartition, 
OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>> 
getRecordPerPartitionAtCurrentOffset(Set<KafkaTopicPartition> partitions)
+  {
+    Map<KafkaTopicPartition, OrderedPartitionableRecord<KafkaTopicPartition, 
Long, KafkaRecordEntity>> result = new HashMap<>();
+    int maxPolls = 10;
+    while (maxPolls-- > 0) {
+      for (OrderedPartitionableRecord<KafkaTopicPartition, Long, 
KafkaRecordEntity> record : 
recordSupplier.poll(getIoConfig().getPollTimeout())) {
+        if (!result.containsKey(record.getPartitionId())) {
+          result.put(record.getPartitionId(), record);
+          if (partitions.size() == result.size()) {
+            break;
+          }
+        }
+      }
+      if (partitions.size() == result.size()) {
+        break;
+      }
+    }
+
+    return result;

Review Comment:
   This still needs to be addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to