This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 454bbb725a8 Emit time lag from Kafka supervisor (#17735)
454bbb725a8 is described below

commit 454bbb725a843341633b0c3c1cc3b6d3741a71a9
Author: Adithya Chakilam <[email protected]>
AuthorDate: Wed Mar 5 10:40:43 2025 -0600

    Emit time lag from Kafka supervisor (#17735)
    
    Changes
    ---------
    - Emit time lag from Kafka similar to Kinesis as metrics 
`ingest/kafka/lag/time`,
    `ingest/kafka/maxLag/time`, `ingest/kafka/avgLag/time`
    - Add new method in `KafkaSupervisor` to fetch timestamps of latest records 
in stream to compute time lag
    - Add new field `emitTimeLagMetrics` in `KafkaSupervisorIOConfig` to toggle 
emission of new metrics
---
 docs/operations/metrics.md                         |   5 +
 .../rabbitstream/RabbitStreamRecordSupplier.java   |   1 -
 .../druid/indexing/kafka/KafkaRecordSupplier.java  |  19 +++-
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 110 ++++++++++++++++++++-
 .../kafka/supervisor/KafkaSupervisorIOConfig.java  |  15 ++-
 .../supervisor/KafkaSupervisorReportPayload.java   |   3 +-
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java |  18 ++--
 .../supervisor/KafkaSupervisorIOConfigTest.java    |   6 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      |  23 ++++-
 .../common/OrderedPartitionableRecord.java         |  22 +++++
 .../seekablestream/common/RecordSupplier.java      |  11 +++
 .../supervisor/SeekableStreamSupervisor.java       |  19 ++++
 12 files changed, 233 insertions(+), 19 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index ba74079de39..926740b5fb4 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -223,6 +223,10 @@ These metrics apply to the [Kafka indexing 
service](../ingestion/kafka-ingestion
 |`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, `stream`, 
`tags`|Greater than 0, should not be a very high number. |
 |`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka 
indexing tasks and latest offsets in Kafka brokers across all partitions. 
Minimum emission period for this metric is a minute.|`dataSource`, `stream`, 
`tags`|Greater than 0, should not be a very high number. |
 |`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed 
by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum 
emission period for this metric is a minute.|`dataSource`, `stream`, 
`partition`, `tags`|Greater than 0, should not be a very high number. |
+|`ingest/kafka/updateOffsets/time`|Total time (in milliseconds) taken to fetch 
the latest offsets from Kafka stream and the ingestion tasks.|`dataSource`, 
`taskId`, `taskType`, `groupId`, `tags`|Generally a few seconds at most.|
+|`ingest/kafka/lag/time`|Total lag time in milliseconds between the current 
message sequence number consumed by the Kafka indexing tasks and latest 
sequence number in Kafka across all shards. Minimum emission period for this 
metric is a minute. Enabled only when `pusblishLagTime` is set to true on 
supervisor config.|`dataSource`, `stream`, `tags`|Greater than 0, up to max 
kafka retention period in milliseconds. |
+|`ingest/kafka/maxLag/time`|Max lag time in milliseconds between the current 
message sequence number consumed by the Kafka indexing tasks and latest 
sequence number in Kafka across all shards. Minimum emission period for this 
metric is a minute. Enabled only when `pusblishLagTime` is set to true on 
supervisor config.|`dataSource`, `stream`, `tags`|Greater than 0, up to max 
kafka retention period in milliseconds. |
+|`ingest/kafka/avgLag/time`|Average lag time in milliseconds between the 
current message sequence number consumed by the Kafka indexing tasks and latest 
sequence number in Kafka across all shards. Minimum emission period for this 
metric is a minute. Enabled only when `pusblishLagTime` is set to true on 
supervisor config.|`dataSource`, `stream`, `tags`|Greater than 0, up to max 
kafka retention period in milliseconds. |
 
 ### Ingestion metrics for Kinesis
 
@@ -234,6 +238,7 @@ These metrics apply to the [Kinesis indexing 
service](../ingestion/kinesis-inges
 |`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current 
message sequence number consumed by the Kinesis indexing tasks and latest 
sequence number in Kinesis across all shards. Minimum emission period for this 
metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max 
Kinesis retention period in milliseconds. |
 |`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the 
current message sequence number consumed by the Kinesis indexing tasks and 
latest sequence number in Kinesis across all shards. Minimum emission period 
for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up 
to max Kinesis retention period in milliseconds. |
 |`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds 
between the current message sequence number consumed by the Kinesis indexing 
tasks and latest sequence number in Kinesis. Minimum emission period for this 
metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, 
up to max Kinesis retention period in milliseconds. |
+|`ingest/kinesis/updateOffsets/time`|Total time (in milliseconds) taken to 
fetch the latest offsets from Kafka stream and the ingestion 
tasks.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Generally a few 
seconds at most.|
 
 ### Compaction metrics
 
diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamRecordSupplier.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamRecordSupplier.java
index 56d7cf44ea1..b2a2432fc9c 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamRecordSupplier.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamRecordSupplier.java
@@ -43,7 +43,6 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.DynamicConfigProvider;
 
 import javax.annotation.Nonnull;
-
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index a88300552e2..ac7683a25ef 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.metrics.Monitor;
 import org.apache.druid.metadata.DynamicConfigProvider;
 import org.apache.druid.metadata.PasswordProvider;
+import org.apache.druid.utils.CollectionUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -166,7 +167,8 @@ public class KafkaRecordSupplier implements 
RecordSupplier<KafkaTopicPartition,
           record.topic(),
           new KafkaTopicPartition(multiTopic, record.topic(), 
record.partition()),
           record.offset(),
-          record.value() == null ? null : ImmutableList.of(new 
KafkaRecordEntity(record))
+          record.value() == null ? null : ImmutableList.of(new 
KafkaRecordEntity(record)),
+          record.timestamp()
       ));
     }
     return polledRecords;
@@ -206,6 +208,21 @@ public class KafkaRecordSupplier implements 
RecordSupplier<KafkaTopicPartition,
     return wrapExceptions(() -> 
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
   }
 
+  @Override
+  public Map<KafkaTopicPartition, Long> 
getLatestSequenceNumbers(Set<StreamPartition<KafkaTopicPartition>> partitions)
+  {
+    return wrapExceptions(() -> CollectionUtils.mapKeys(
+      consumer.endOffsets(
+        partitions
+            .stream()
+            .map(e -> e.getPartitionId().asTopicPartition(e.getStream()))
+            .collect(Collectors.toList()
+        )
+      ),
+      p -> new KafkaTopicPartition(multiTopic, p.topic(), p.partition())
+    ));
+  }
+
   @Override
   public Set<KafkaTopicPartition> getPartitionIds(String stream)
   {
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index a0c7494e3bb..b990c8fa3fc 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -46,6 +46,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamException;
@@ -91,6 +92,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
 
   private final Pattern pattern;
   private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
+  private volatile Map<KafkaTopicPartition, Long> partitionToTimeLag;
 
   private final KafkaSupervisorSpec spec;
 
@@ -171,6 +173,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
         ioConfig.getTaskDuration().getMillis() / 1000,
         includeOffsets ? latestSequenceFromStream : null,
         includeOffsets ? partitionLag : null,
+        includeOffsets ? getPartitionTimeLag() : null,
         includeOffsets ? partitionLag.values().stream().mapToLong(x -> 
Math.max(x, 0)).sum() : null,
         includeOffsets ? sequenceLastUpdated : null,
         spec.isSuspended(),
@@ -273,8 +276,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
   @Override
   protected Map<KafkaTopicPartition, Long> getPartitionTimeLag()
   {
-    // time lag not currently support with kafka
-    return null;
+    return partitionToTimeLag;
   }
 
   // suppress use of CollectionUtils.mapValues() since the valueMapper 
function is dependent on map key here
@@ -380,6 +382,105 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
     return computeLags(partitionRecordLag);
   }
 
+  /**
+   * This method is similar to updatePartitionLagFromStream
+   * but also determines time lag. Once this method has been
+   * tested, we can remove the older one.
+   */
+  private void updatePartitionTimeAndRecordLagFromStream()
+  {
+    final Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
+    getRecordSupplierLock().lock();
+    try {
+      Set<KafkaTopicPartition> partitionIds;
+      try {
+        partitionIds = 
recordSupplier.getPartitionIds(getIoConfig().getStream());
+      }
+      catch (Exception e) {
+        log.warn("Could not fetch partitions for topic/stream [%s]", 
getIoConfig().getStream());
+        throw new StreamException(e);
+      }
+
+      final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+          .stream()
+          .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+          .collect(Collectors.toSet());
+
+      // Since we cannot compute the current timestamp for partitions for
+      // which we haven't started reading yet explictly set them.
+      final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+      for (KafkaTopicPartition partition : partitionIds) {
+        Long highestCurrentOffset = highestCurrentOffsets.get(partition);
+        if (highestCurrentOffset == null || highestCurrentOffset == 0) {
+          yetToReadPartitions.add(partition);
+        } else {
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
partition), highestCurrentOffset - 1);
+        }
+      }
+
+      final Map<KafkaTopicPartition, Long> lastIngestedTimestamps = 
getTimestampPerPartitionAtCurrentOffset(partitions);
+      // Note: this might give weird values for lag when the tasks are yet to 
start processing
+      yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
+
+      recordSupplier.seekToLatest(partitions);
+      latestSequenceFromStream = 
recordSupplier.getLatestSequenceNumbers(partitions);
+
+      for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
+        // if there are no messages .getEndOffset would return 0, but if there 
are n msgs it would return n+1
+        // and hence we need to seek to n - 2 to get the nth msg in the next 
poll.
+        if (entry.getValue() != 0) {
+          recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(), 
entry.getKey()), entry.getValue() - 2);
+        }
+      }
+
+      partitionToTimeLag = getTimestampPerPartitionAtCurrentOffset(partitions)
+          .entrySet().stream().filter(e -> 
lastIngestedTimestamps.containsKey(e.getKey()))
+          .collect(
+              Collectors.toMap(
+                  Entry::getKey,
+                  e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
+              )
+          );
+    }
+    catch (InterruptedException e) {
+      throw new StreamException(e);
+    }
+    finally {
+      getRecordSupplierLock().unlock();
+    }
+  }
+
+  private Map<KafkaTopicPartition, Long> 
getTimestampPerPartitionAtCurrentOffset(Set<StreamPartition<KafkaTopicPartition>>
 allPartitions)
+  {
+    Map<KafkaTopicPartition, Long> result = new HashMap<>();
+    Set<StreamPartition<KafkaTopicPartition>> remainingPartitions = new 
HashSet<>(allPartitions);
+
+    try {
+      int maxPolls = 5;
+      while (!remainingPartitions.isEmpty() && maxPolls-- > 0) {
+        for (OrderedPartitionableRecord<KafkaTopicPartition, Long, 
KafkaRecordEntity> record : 
recordSupplier.poll(getIoConfig().getPollTimeout())) {
+          if (!result.containsKey(record.getPartitionId())) {
+            result.put(record.getPartitionId(), record.getTimestamp());
+            remainingPartitions.remove(new 
StreamPartition<>(getIoConfig().getStream(), record.getPartitionId()));
+            if (remainingPartitions.isEmpty()) {
+              break;
+            }
+          }
+          recordSupplier.assign(remainingPartitions);
+        }
+      }
+    }
+    finally {
+      recordSupplier.assign(allPartitions);
+    }
+
+    if (!remainingPartitions.isEmpty()) {
+      log.info("Could not fetch the latest timestamp for partitions [%s].", 
remainingPartitions);
+    }
+    return result;
+  }
+
   /**
    * Fetches the latest offsets from the Kafka stream and updates the map
    * {@link #latestSequenceFromStream}. The actual lag is computed lazily in
@@ -388,6 +489,11 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
   @Override
   protected void updatePartitionLagFromStream()
   {
+    if (getIoConfig().isEmitTimeLagMetrics()) {
+      updatePartitionTimeAndRecordLagFromStream();
+      return;
+    }
+
     getRecordSupplierLock().lock();
     try {
       Set<KafkaTopicPartition> partitionIds;
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index fbf55f4ab5e..be4f6cfb196 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka.supervisor;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
@@ -51,6 +52,7 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
   private final KafkaConfigOverrides configOverrides;
   private final String topic;
   private final String topicPattern;
+  private final boolean emitTimeLagMetrics;
 
   @JsonCreator
   public KafkaSupervisorIOConfig(
@@ -72,7 +74,8 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
       @JsonProperty("lateMessageRejectionStartDateTime") DateTime 
lateMessageRejectionStartDateTime,
       @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
       @JsonProperty("idleConfig") IdleConfig idleConfig,
-      @JsonProperty("stopTaskCount") Integer stopTaskCount
+      @JsonProperty("stopTaskCount") Integer stopTaskCount,
+      @Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics
   )
   {
     super(
@@ -102,6 +105,7 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
     this.configOverrides = configOverrides;
     this.topic = topic;
     this.topicPattern = topicPattern;
+    this.emitTimeLagMetrics = Configs.valueOrDefault(emitTimeLagMetrics, 
false);
   }
 
   /**
@@ -151,6 +155,15 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
     return topicPattern != null;
   }
 
+  /**
+   * @return true if supervisor needs to publish the time lag.
+   */
+  @JsonProperty
+  public boolean isEmitTimeLagMetrics()
+  {
+    return emitTimeLagMetrics;
+  }
+
   @Override
   public String toString()
   {
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
index 6a88dc16ec8..c64c4426d69 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
@@ -38,6 +38,7 @@ public class KafkaSupervisorReportPayload extends 
SeekableStreamSupervisorReport
       long durationSeconds,
       @Nullable Map<KafkaTopicPartition, Long> latestOffsets,
       @Nullable Map<KafkaTopicPartition, Long> minimumLag,
+      @Nullable Map<KafkaTopicPartition, Long> minimumLagMillis,
       @Nullable Long aggregateLag,
       @Nullable DateTime offsetsLastUpdated,
       boolean suspended,
@@ -56,7 +57,7 @@ public class KafkaSupervisorReportPayload extends 
SeekableStreamSupervisorReport
         latestOffsets,
         minimumLag,
         aggregateLag,
-        null,
+        minimumLagMillis,
         null,
         offsetsLastUpdated,
         suspended,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 9ed5f068e0b..dd7efcdea51 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -165,7 +165,8 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
-            null
+            null,
+            false
         ),
         null,
         null,
@@ -218,7 +219,8 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
-            null
+            null,
+            false
         ),
         null,
         null,
@@ -280,7 +282,8 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
-            null
+            null,
+            false
         ),
         null,
         null,
@@ -384,7 +387,8 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
-            null
+            null,
+            false
         ),
         null,
         null,
@@ -568,7 +572,8 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
-            null
+            null,
+            false
         ),
         null,
         null,
@@ -624,7 +629,8 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
             null,
             null,
             null,
-            null
+            null,
+            false
         ),
         null,
         null,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 64ac05865d6..cb2956afd3b 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -337,7 +337,8 @@ public class KafkaSupervisorIOConfigTest
         null,
         null,
         null,
-        null
+        null,
+        false
     );
     String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = 
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
@@ -380,7 +381,8 @@ public class KafkaSupervisorIOConfigTest
         null,
         null,
         mapper.convertValue(idleConfig, IdleConfig.class),
-        null
+        null,
+        false
     );
     String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = 
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index f330c97f0ea..3eb0d2a8df8 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -119,6 +119,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -319,7 +321,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         new IdleConfig(true, 1000L),
-        1
+        1,
+        false
     );
 
     final KafkaSupervisorTuningConfig tuningConfigOri = new 
KafkaSupervisorTuningConfig(
@@ -2093,7 +2096,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
     final KafkaSupervisorTuningConfig tuningConfig = 
supervisor.getTuningConfig();
-    addSomeEvents(1);
+    addSomeEvents(30);
 
     Task task = createKafkaIndexTask(
         "id1",
@@ -2324,6 +2327,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals(1, payload.getPublishingTasks().size());
     Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, 
payload.getDetailedState());
     Assert.assertEquals(0, payload.getRecentErrors().size());
+    Assert.assertEquals(
+        singlePartitionMap(topic, 0, 10000, 1, 5000, 2, 0),
+        payload.getMinimumLagMillis()
+    );
 
     TaskReportData activeReport = payload.getActiveTasks().get(0);
     TaskReportData publishingReport = payload.getPublishingTasks().get(0);
@@ -5115,16 +5122,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
       kafkaProducer.initTransactions();
       kafkaProducer.beginTransaction();
+      Instant time = Instant.now();
       for (int i = 0; i < NUM_PARTITIONS; i++) {
         for (int j = 0; j < numEventsPerPartition; j++) {
           kafkaProducer.send(
               new ProducerRecord<>(
                   topic,
                   i,
+                  time.toEpochMilli(),
                   null,
                   StringUtils.toUtf8(StringUtils.format("event-%d", j))
               )
           ).get();
+          time = time.plus(5, ChronoUnit.SECONDS);
         }
       }
       kafkaProducer.commitTransaction();
@@ -5286,7 +5296,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         idleConfig,
-        null
+        null,
+        true
     );
 
     KafkaIndexTaskClientFactory taskClientFactory = new 
KafkaIndexTaskClientFactory(
@@ -5400,7 +5411,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        null
+        null,
+        false
     );
 
     KafkaIndexTaskClientFactory taskClientFactory = new 
KafkaIndexTaskClientFactory(
@@ -5517,7 +5529,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        null
+        null,
+        false
     );
 
     KafkaIndexTaskClientFactory taskClientFactory = new 
KafkaIndexTaskClientFactory(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
index 5d052585ba9..17a7ab3dd5d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
@@ -41,6 +41,7 @@ public class OrderedPartitionableRecord<PartitionIdType, 
SequenceOffsetType, Rec
   private final PartitionIdType partitionId;
   private final SequenceOffsetType sequenceNumber;
   private final List<RecordType> data;
+  private final Long timestamp;
 
   public OrderedPartitionableRecord(
       String stream,
@@ -48,6 +49,17 @@ public class OrderedPartitionableRecord<PartitionIdType, 
SequenceOffsetType, Rec
       SequenceOffsetType sequenceNumber,
       List<RecordType> data
   )
+  {
+    this(stream, partitionId, sequenceNumber, data, null);
+  }
+
+  public OrderedPartitionableRecord(
+      String stream,
+      PartitionIdType partitionId,
+      SequenceOffsetType sequenceNumber,
+      List<RecordType> data,
+      Long timestamp
+  )
   {
     Preconditions.checkNotNull(stream, "stream");
     Preconditions.checkNotNull(partitionId, "partitionId");
@@ -56,6 +68,7 @@ public class OrderedPartitionableRecord<PartitionIdType, 
SequenceOffsetType, Rec
     this.partitionId = partitionId;
     this.sequenceNumber = sequenceNumber;
     this.data = data == null ? ImmutableList.of() : data;
+    this.timestamp = timestamp;
   }
 
   public String getStream()
@@ -63,6 +76,15 @@ public class OrderedPartitionableRecord<PartitionIdType, 
SequenceOffsetType, Rec
     return stream;
   }
 
+  /**
+   *
+   * @return Timestamp in millis when the record was published to the stream, 
-1 if not known
+   */
+  public long getTimestamp()
+  {
+    return timestamp;
+  }
+
   public PartitionIdType getPartitionId()
   {
     return partitionId;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index b9db4202a8f..1c7e3248025 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -27,6 +27,7 @@ import javax.validation.constraints.NotNull;
 import java.io.Closeable;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -132,6 +133,16 @@ public interface RecordSupplier<PartitionIdType, 
SequenceOffsetType, RecordType
    */
   Set<PartitionIdType> getPartitionIds(String stream);
 
+  /**
+   * Returns the end offsets for all the assigned partitions.
+   *
+   * @return Map from Partition ID to the corresponding end offset
+   */
+  default Map<PartitionIdType, SequenceOffsetType> 
getLatestSequenceNumbers(Set<StreamPartition<PartitionIdType>> partitions)
+  {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * close the RecordSupplier
    */
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index faa68f3d8ea..47781826265 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -4093,6 +4093,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     // healthy go ahead and try anyway to try if possible to provide insight 
into how much time is left to fix the
     // issue for cluster operators since this feeds the lag metrics
     if (stateManager.isIdle() || stateManager.isSteadyState() || 
!stateManager.isHealthy()) {
+      final Stopwatch runTime = Stopwatch.createStarted();
       try {
         updateCurrentOffsets();
         updatePartitionLagFromStream();
@@ -4101,6 +4102,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       catch (Exception e) {
         log.warn(e, "Exception while getting current/latest sequences");
       }
+      finally {
+        emitUpdateOffsetsTime(runTime.millisElapsed());
+      }
     }
   }
 
@@ -4506,6 +4510,21 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  private void emitUpdateOffsetsTime(long timeInMillis)
+  {
+    try {
+      emitter.emit(
+          ServiceMetricEvent.builder()
+                            .setDimension(DruidMetrics.DATASOURCE, dataSource)
+                            .setDimensionIfNotNull(DruidMetrics.TAGS, 
spec.getContextValue(DruidMetrics.TAGS))
+                            
.setMetric(StringUtils.format("ingest/%s/fetchOffsets/time", spec.getType()), 
timeInMillis)
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Unable to emit updateOffsets time");
+    }
+  }
+
   protected void emitNoticesQueueSize()
   {
     if (spec.isSuspended()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to