This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 454bbb725a8 Emit time lag from Kafka supervisor (#17735)
454bbb725a8 is described below
commit 454bbb725a843341633b0c3c1cc3b6d3741a71a9
Author: Adithya Chakilam <[email protected]>
AuthorDate: Wed Mar 5 10:40:43 2025 -0600
Emit time lag from Kafka supervisor (#17735)
Changes
---------
- Emit time lag from Kafka similar to Kinesis as metrics
`ingest/kafka/lag/time`,
`ingest/kafka/maxLag/time`, `ingest/kafka/avgLag/time`
- Add new method in `KafkaSupervisor` to fetch timestamps of latest records
in stream to compute time lag
- Add new field `emitTimeLagMetrics` in `KafkaSupervisorIOConfig` to toggle
emission of new metrics
---
docs/operations/metrics.md | 5 +
.../rabbitstream/RabbitStreamRecordSupplier.java | 1 -
.../druid/indexing/kafka/KafkaRecordSupplier.java | 19 +++-
.../indexing/kafka/supervisor/KafkaSupervisor.java | 110 ++++++++++++++++++++-
.../kafka/supervisor/KafkaSupervisorIOConfig.java | 15 ++-
.../supervisor/KafkaSupervisorReportPayload.java | 3 +-
.../druid/indexing/kafka/KafkaSamplerSpecTest.java | 18 ++--
.../supervisor/KafkaSupervisorIOConfigTest.java | 6 +-
.../kafka/supervisor/KafkaSupervisorTest.java | 23 ++++-
.../common/OrderedPartitionableRecord.java | 22 +++++
.../seekablestream/common/RecordSupplier.java | 11 +++
.../supervisor/SeekableStreamSupervisor.java | 19 ++++
12 files changed, 233 insertions(+), 19 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index ba74079de39..926740b5fb4 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -223,6 +223,10 @@ These metrics apply to the [Kafka indexing
service](../ingestion/kafka-ingestion
|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka
indexing tasks and latest offsets in Kafka brokers across all partitions.
Minimum emission period for this metric is a minute.|`dataSource`, `stream`,
`tags`|Greater than 0, should not be a very high number. |
|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka
indexing tasks and latest offsets in Kafka brokers across all partitions.
Minimum emission period for this metric is a minute.|`dataSource`, `stream`,
`tags`|Greater than 0, should not be a very high number. |
|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed
by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum
emission period for this metric is a minute.|`dataSource`, `stream`,
`partition`, `tags`|Greater than 0, should not be a very high number. |
+|`ingest/kafka/updateOffsets/time`|Total time (in milliseconds) taken to fetch
the latest offsets from Kafka stream and the ingestion tasks.|`dataSource`,
`taskId`, `taskType`, `groupId`, `tags`|Generally a few seconds at most.|
+|`ingest/kafka/lag/time`|Total lag time in milliseconds between the current
message sequence number consumed by the Kafka indexing tasks and latest
sequence number in Kafka across all shards. Minimum emission period for this
metric is a minute. Enabled only when `pusblishLagTime` is set to true on
supervisor config.|`dataSource`, `stream`, `tags`|Greater than 0, up to max
kafka retention period in milliseconds. |
+|`ingest/kafka/maxLag/time`|Max lag time in milliseconds between the current
message sequence number consumed by the Kafka indexing tasks and latest
sequence number in Kafka across all shards. Minimum emission period for this
metric is a minute. Enabled only when `pusblishLagTime` is set to true on
supervisor config.|`dataSource`, `stream`, `tags`|Greater than 0, up to max
kafka retention period in milliseconds. |
+|`ingest/kafka/avgLag/time`|Average lag time in milliseconds between the
current message sequence number consumed by the Kafka indexing tasks and latest
sequence number in Kafka across all shards. Minimum emission period for this
metric is a minute. Enabled only when `pusblishLagTime` is set to true on
supervisor config.|`dataSource`, `stream`, `tags`|Greater than 0, up to max
kafka retention period in milliseconds. |
### Ingestion metrics for Kinesis
@@ -234,6 +238,7 @@ These metrics apply to the [Kinesis indexing
service](../ingestion/kinesis-inges
|`ingest/kinesis/maxLag/time`|Max lag time in milliseconds between the current
message sequence number consumed by the Kinesis indexing tasks and latest
sequence number in Kinesis across all shards. Minimum emission period for this
metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up to max
Kinesis retention period in milliseconds. |
|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the
current message sequence number consumed by the Kinesis indexing tasks and
latest sequence number in Kinesis across all shards. Minimum emission period
for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up
to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds
between the current message sequence number consumed by the Kinesis indexing
tasks and latest sequence number in Kinesis. Minimum emission period for this
metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0,
up to max Kinesis retention period in milliseconds. |
+|`ingest/kinesis/updateOffsets/time`|Total time (in milliseconds) taken to
fetch the latest offsets from Kafka stream and the ingestion
tasks.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Generally a few
seconds at most.|
### Compaction metrics
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamRecordSupplier.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamRecordSupplier.java
index 56d7cf44ea1..b2a2432fc9c 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamRecordSupplier.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamRecordSupplier.java
@@ -43,7 +43,6 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.DynamicConfigProvider;
import javax.annotation.Nonnull;
-
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index a88300552e2..ac7683a25ef 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.PasswordProvider;
+import org.apache.druid.utils.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
@@ -166,7 +167,8 @@ public class KafkaRecordSupplier implements
RecordSupplier<KafkaTopicPartition,
record.topic(),
new KafkaTopicPartition(multiTopic, record.topic(),
record.partition()),
record.offset(),
- record.value() == null ? null : ImmutableList.of(new
KafkaRecordEntity(record))
+ record.value() == null ? null : ImmutableList.of(new
KafkaRecordEntity(record)),
+ record.timestamp()
));
}
return polledRecords;
@@ -206,6 +208,21 @@ public class KafkaRecordSupplier implements
RecordSupplier<KafkaTopicPartition,
return wrapExceptions(() ->
consumer.position(partition.getPartitionId().asTopicPartition(partition.getStream())));
}
+ @Override
+ public Map<KafkaTopicPartition, Long>
getLatestSequenceNumbers(Set<StreamPartition<KafkaTopicPartition>> partitions)
+ {
+ return wrapExceptions(() -> CollectionUtils.mapKeys(
+ consumer.endOffsets(
+ partitions
+ .stream()
+ .map(e -> e.getPartitionId().asTopicPartition(e.getStream()))
+ .collect(Collectors.toList()
+ )
+ ),
+ p -> new KafkaTopicPartition(multiTopic, p.topic(), p.partition())
+ ));
+ }
+
@Override
public Set<KafkaTopicPartition> getPartitionIds(String stream)
{
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index a0c7494e3bb..b990c8fa3fc 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -46,6 +46,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
@@ -91,6 +92,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
private final Pattern pattern;
private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
+ private volatile Map<KafkaTopicPartition, Long> partitionToTimeLag;
private final KafkaSupervisorSpec spec;
@@ -171,6 +173,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
ioConfig.getTaskDuration().getMillis() / 1000,
includeOffsets ? latestSequenceFromStream : null,
includeOffsets ? partitionLag : null,
+ includeOffsets ? getPartitionTimeLag() : null,
includeOffsets ? partitionLag.values().stream().mapToLong(x ->
Math.max(x, 0)).sum() : null,
includeOffsets ? sequenceLastUpdated : null,
spec.isSuspended(),
@@ -273,8 +276,7 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
@Override
protected Map<KafkaTopicPartition, Long> getPartitionTimeLag()
{
- // time lag not currently support with kafka
- return null;
+ return partitionToTimeLag;
}
// suppress use of CollectionUtils.mapValues() since the valueMapper
function is dependent on map key here
@@ -380,6 +382,105 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ private void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ // Since we cannot compute the current timestamp for partitions for
+ // which we haven't started reading yet explictly set them.
+ final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+ for (KafkaTopicPartition partition : partitionIds) {
+ Long highestCurrentOffset = highestCurrentOffsets.get(partition);
+ if (highestCurrentOffset == null || highestCurrentOffset == 0) {
+ yetToReadPartitions.add(partition);
+ } else {
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
partition), highestCurrentOffset - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
getTimestampPerPartitionAtCurrentOffset(partitions);
+ // Note: this might give weird values for lag when the tasks are yet to
start processing
+ yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
+
+ recordSupplier.seekToLatest(partitions);
+ latestSequenceFromStream =
recordSupplier.getLatestSequenceNumbers(partitions);
+
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
latestSequenceFromStream.entrySet()) {
+ // if there are no messages .getEndOffset would return 0, but if there
are n msgs it would return n+1
+ // and hence we need to seek to n - 2 to get the nth msg in the next
poll.
+ if (entry.getValue() != 0) {
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 2);
+ }
+ }
+
+ partitionToTimeLag = getTimestampPerPartitionAtCurrentOffset(partitions)
+ .entrySet().stream().filter(e ->
lastIngestedTimestamps.containsKey(e.getKey()))
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
+ )
+ );
+ }
+ catch (InterruptedException e) {
+ throw new StreamException(e);
+ }
+ finally {
+ getRecordSupplierLock().unlock();
+ }
+ }
+
+ private Map<KafkaTopicPartition, Long>
getTimestampPerPartitionAtCurrentOffset(Set<StreamPartition<KafkaTopicPartition>>
allPartitions)
+ {
+ Map<KafkaTopicPartition, Long> result = new HashMap<>();
+ Set<StreamPartition<KafkaTopicPartition>> remainingPartitions = new
HashSet<>(allPartitions);
+
+ try {
+ int maxPolls = 5;
+ while (!remainingPartitions.isEmpty() && maxPolls-- > 0) {
+ for (OrderedPartitionableRecord<KafkaTopicPartition, Long,
KafkaRecordEntity> record :
recordSupplier.poll(getIoConfig().getPollTimeout())) {
+ if (!result.containsKey(record.getPartitionId())) {
+ result.put(record.getPartitionId(), record.getTimestamp());
+ remainingPartitions.remove(new
StreamPartition<>(getIoConfig().getStream(), record.getPartitionId()));
+ if (remainingPartitions.isEmpty()) {
+ break;
+ }
+ }
+ recordSupplier.assign(remainingPartitions);
+ }
+ }
+ }
+ finally {
+ recordSupplier.assign(allPartitions);
+ }
+
+ if (!remainingPartitions.isEmpty()) {
+ log.info("Could not fetch the latest timestamp for partitions [%s].",
remainingPartitions);
+ }
+ return result;
+ }
+
/**
* Fetches the latest offsets from the Kafka stream and updates the map
* {@link #latestSequenceFromStream}. The actual lag is computed lazily in
@@ -388,6 +489,11 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
@Override
protected void updatePartitionLagFromStream()
{
+ if (getIoConfig().isEmitTimeLagMetrics()) {
+ updatePartitionTimeAndRecordLagFromStream();
+ return;
+ }
+
getRecordSupplierLock().lock();
try {
Set<KafkaTopicPartition> partitionIds;
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index fbf55f4ab5e..be4f6cfb196 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
@@ -51,6 +52,7 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
private final KafkaConfigOverrides configOverrides;
private final String topic;
private final String topicPattern;
+ private final boolean emitTimeLagMetrics;
@JsonCreator
public KafkaSupervisorIOConfig(
@@ -72,7 +74,8 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
@JsonProperty("lateMessageRejectionStartDateTime") DateTime
lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
@JsonProperty("idleConfig") IdleConfig idleConfig,
- @JsonProperty("stopTaskCount") Integer stopTaskCount
+ @JsonProperty("stopTaskCount") Integer stopTaskCount,
+ @Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics
)
{
super(
@@ -102,6 +105,7 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
this.configOverrides = configOverrides;
this.topic = topic;
this.topicPattern = topicPattern;
+ this.emitTimeLagMetrics = Configs.valueOrDefault(emitTimeLagMetrics,
false);
}
/**
@@ -151,6 +155,15 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
return topicPattern != null;
}
+ /**
+ * @return true if supervisor needs to publish the time lag.
+ */
+ @JsonProperty
+ public boolean isEmitTimeLagMetrics()
+ {
+ return emitTimeLagMetrics;
+ }
+
@Override
public String toString()
{
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
index 6a88dc16ec8..c64c4426d69 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
@@ -38,6 +38,7 @@ public class KafkaSupervisorReportPayload extends
SeekableStreamSupervisorReport
long durationSeconds,
@Nullable Map<KafkaTopicPartition, Long> latestOffsets,
@Nullable Map<KafkaTopicPartition, Long> minimumLag,
+ @Nullable Map<KafkaTopicPartition, Long> minimumLagMillis,
@Nullable Long aggregateLag,
@Nullable DateTime offsetsLastUpdated,
boolean suspended,
@@ -56,7 +57,7 @@ public class KafkaSupervisorReportPayload extends
SeekableStreamSupervisorReport
latestOffsets,
minimumLag,
aggregateLag,
- null,
+ minimumLagMillis,
null,
offsetsLastUpdated,
suspended,
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 9ed5f068e0b..dd7efcdea51 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -165,7 +165,8 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
- null
+ null,
+ false
),
null,
null,
@@ -218,7 +219,8 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
- null
+ null,
+ false
),
null,
null,
@@ -280,7 +282,8 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
- null
+ null,
+ false
),
null,
null,
@@ -384,7 +387,8 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
- null
+ null,
+ false
),
null,
null,
@@ -568,7 +572,8 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
- null
+ null,
+ false
),
null,
null,
@@ -624,7 +629,8 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
null,
null,
null,
- null
+ null,
+ false
),
null,
null,
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 64ac05865d6..cb2956afd3b 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -337,7 +337,8 @@ public class KafkaSupervisorIOConfigTest
null,
null,
null,
- null
+ null,
+ false
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 =
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
@@ -380,7 +381,8 @@ public class KafkaSupervisorIOConfigTest
null,
null,
mapper.convertValue(idleConfig, IdleConfig.class),
- null
+ null,
+ false
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 =
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index f330c97f0ea..3eb0d2a8df8 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -119,6 +119,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -319,7 +321,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
new IdleConfig(true, 1000L),
- 1
+ 1,
+ false
);
final KafkaSupervisorTuningConfig tuningConfigOri = new
KafkaSupervisorTuningConfig(
@@ -2093,7 +2096,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig =
supervisor.getTuningConfig();
- addSomeEvents(1);
+ addSomeEvents(30);
Task task = createKafkaIndexTask(
"id1",
@@ -2324,6 +2327,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(1, payload.getPublishingTasks().size());
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING,
payload.getDetailedState());
Assert.assertEquals(0, payload.getRecentErrors().size());
+ Assert.assertEquals(
+ singlePartitionMap(topic, 0, 10000, 1, 5000, 2, 0),
+ payload.getMinimumLagMillis()
+ );
TaskReportData activeReport = payload.getActiveTasks().get(0);
TaskReportData publishingReport = payload.getPublishingTasks().get(0);
@@ -5115,16 +5122,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
+ Instant time = Instant.now();
for (int i = 0; i < NUM_PARTITIONS; i++) {
for (int j = 0; j < numEventsPerPartition; j++) {
kafkaProducer.send(
new ProducerRecord<>(
topic,
i,
+ time.toEpochMilli(),
null,
StringUtils.toUtf8(StringUtils.format("event-%d", j))
)
).get();
+ time = time.plus(5, ChronoUnit.SECONDS);
}
}
kafkaProducer.commitTransaction();
@@ -5286,7 +5296,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
idleConfig,
- null
+ null,
+ true
);
KafkaIndexTaskClientFactory taskClientFactory = new
KafkaIndexTaskClientFactory(
@@ -5400,7 +5411,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
- null
+ null,
+ false
);
KafkaIndexTaskClientFactory taskClientFactory = new
KafkaIndexTaskClientFactory(
@@ -5517,7 +5529,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
- null
+ null,
+ false
);
KafkaIndexTaskClientFactory taskClientFactory = new
KafkaIndexTaskClientFactory(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
index 5d052585ba9..17a7ab3dd5d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
@@ -41,6 +41,7 @@ public class OrderedPartitionableRecord<PartitionIdType,
SequenceOffsetType, Rec
private final PartitionIdType partitionId;
private final SequenceOffsetType sequenceNumber;
private final List<RecordType> data;
+ private final Long timestamp;
public OrderedPartitionableRecord(
String stream,
@@ -48,6 +49,17 @@ public class OrderedPartitionableRecord<PartitionIdType,
SequenceOffsetType, Rec
SequenceOffsetType sequenceNumber,
List<RecordType> data
)
+ {
+ this(stream, partitionId, sequenceNumber, data, null);
+ }
+
+ public OrderedPartitionableRecord(
+ String stream,
+ PartitionIdType partitionId,
+ SequenceOffsetType sequenceNumber,
+ List<RecordType> data,
+ Long timestamp
+ )
{
Preconditions.checkNotNull(stream, "stream");
Preconditions.checkNotNull(partitionId, "partitionId");
@@ -56,6 +68,7 @@ public class OrderedPartitionableRecord<PartitionIdType,
SequenceOffsetType, Rec
this.partitionId = partitionId;
this.sequenceNumber = sequenceNumber;
this.data = data == null ? ImmutableList.of() : data;
+ this.timestamp = timestamp;
}
public String getStream()
@@ -63,6 +76,15 @@ public class OrderedPartitionableRecord<PartitionIdType,
SequenceOffsetType, Rec
return stream;
}
+ /**
+ *
+ * @return Timestamp in millis when the record was published to the stream,
-1 if not known
+ */
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+
public PartitionIdType getPartitionId()
{
return partitionId;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index b9db4202a8f..1c7e3248025 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -27,6 +27,7 @@ import javax.validation.constraints.NotNull;
import java.io.Closeable;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -132,6 +133,16 @@ public interface RecordSupplier<PartitionIdType,
SequenceOffsetType, RecordType
*/
Set<PartitionIdType> getPartitionIds(String stream);
+ /**
+ * Returns the end offsets for all the assigned partitions.
+ *
+ * @return Map from Partition ID to the corresponding end offset
+ */
+ default Map<PartitionIdType, SequenceOffsetType>
getLatestSequenceNumbers(Set<StreamPartition<PartitionIdType>> partitions)
+ {
+ throw new UnsupportedOperationException();
+ }
+
/**
* close the RecordSupplier
*/
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index faa68f3d8ea..47781826265 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -4093,6 +4093,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// healthy go ahead and try anyway to try if possible to provide insight
into how much time is left to fix the
// issue for cluster operators since this feeds the lag metrics
if (stateManager.isIdle() || stateManager.isSteadyState() ||
!stateManager.isHealthy()) {
+ final Stopwatch runTime = Stopwatch.createStarted();
try {
updateCurrentOffsets();
updatePartitionLagFromStream();
@@ -4101,6 +4102,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
catch (Exception e) {
log.warn(e, "Exception while getting current/latest sequences");
}
+ finally {
+ emitUpdateOffsetsTime(runTime.millisElapsed());
+ }
}
}
@@ -4506,6 +4510,21 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ private void emitUpdateOffsetsTime(long timeInMillis)
+ {
+ try {
+ emitter.emit(
+ ServiceMetricEvent.builder()
+ .setDimension(DruidMetrics.DATASOURCE, dataSource)
+ .setDimensionIfNotNull(DruidMetrics.TAGS,
spec.getContextValue(DruidMetrics.TAGS))
+
.setMetric(StringUtils.format("ingest/%s/fetchOffsets/time", spec.getType()),
timeInMillis)
+ );
+ }
+ catch (Exception e) {
+ log.warn(e, "Unable to emit updateOffsets time");
+ }
+ }
+
protected void emitNoticesQueueSize()
{
if (spec.isSuspended()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]