kfaraz commented on code in PR #17735:
URL: https://github.com/apache/druid/pull/17735#discussion_r1976265230
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java:
##########
@@ -41,13 +41,25 @@ public class OrderedPartitionableRecord<PartitionIdType,
SequenceOffsetType, Rec
private final PartitionIdType partitionId;
private final SequenceOffsetType sequenceNumber;
private final List<RecordType> data;
+ private final long timestamp;
Review Comment:
Thinking about this, I think it would be better to just use a boxed `Long`
and return `null` if it has not been set.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ private void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey())) {
+ if (highestCurrentOffsets.get(entry.getKey()) == null ||
highestCurrentOffsets.get(entry.getKey()) == 0) {
+ yetToReadPartitions.add(entry.getKey());
+ continue;
+ }
+
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
Review Comment:
```suggestion
if (highestCurrentOffsets.get(entry.getKey()) == null ||
highestCurrentOffsets.get(entry.getKey()) == 0) {
yetToReadPartitions.add(entry.getKey());
} else {
recordSupplier.seek(new
StreamPartition<>(getIoConfig().getStream(), entry.getKey()), entry.getValue()
- 1);
}
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ private void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey())) {
+ if (highestCurrentOffsets.get(entry.getKey()) == null ||
highestCurrentOffsets.get(entry.getKey()) == 0) {
+ yetToReadPartitions.add(entry.getKey());
+ continue;
+ }
+
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
getTimestampPerPartitionAtCurrentOffset(partitions);
+ // TODO: this might give wierd values for lag when the tasks are yet to
start processing
+ yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
+
+ recordSupplier.seekToLatest(partitions);
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
+ latestSequenceFromStream =
recordSupplier.getLatestSequenceNumbers(partitions);
+
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
latestSequenceFromStream.entrySet()) {
+ // if there are no messages .getEndOffset would return 0, but if there
are n msgs it would return n+1
+ // and hence we need to seek to n - 2 to get the nth msg in the next
poll.
+ if (entry.getValue() != 0) {
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 2);
+ }
+ }
+
+ partitionToTimeLag = getTimestampPerPartitionAtCurrentOffset(partitions)
+ .entrySet().stream().filter(e ->
lastIngestedTimestamps.containsKey(e.getKey()))
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
+ )
+ );
+ }
+ catch (InterruptedException e) {
+ throw new StreamException(e);
+ }
+ finally {
+ getRecordSupplierLock().unlock();
+ }
+ }
+
+ private Map<KafkaTopicPartition, Long>
getTimestampPerPartitionAtCurrentOffset(Set<StreamPartition<KafkaTopicPartition>>
allPartitions)
+ {
+ Map<KafkaTopicPartition, Long> result = new HashMap<>();
+ Set<StreamPartition<KafkaTopicPartition>> remainingPartitions = new
HashSet<>(allPartitions);
+
+ try {
+ int maxPolls = 5;
+ while (!remainingPartitions.isEmpty() && maxPolls-- > 0) {
+ for (OrderedPartitionableRecord<KafkaTopicPartition, Long,
KafkaRecordEntity> record :
recordSupplier.poll(getIoConfig().getPollTimeout())) {
+ if (!result.containsKey(record.getPartitionId())) {
+ result.put(record.getPartitionId(), record.getTimestamp());
+ remainingPartitions.remove(new
StreamPartition<>(getIoConfig().getStream(), record.getPartitionId()));
+ if (remainingPartitions.isEmpty()) {
+ break;
+ }
+ }
+ recordSupplier.assign(remainingPartitions);
+ }
+ }
+ }
+ finally {
+ recordSupplier.assign(allPartitions);
+ }
+
+ if (!remainingPartitions.isEmpty()) {
+ log.info("Couldn't fetch the latest timestamp for the following
partitions: [%s]", remainingPartitions);
Review Comment:
```suggestion
log.info("Could not fetch the latest timestamp for partitions[%s].",
remainingPartitions);
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ private void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey())) {
+ if (highestCurrentOffsets.get(entry.getKey()) == null ||
highestCurrentOffsets.get(entry.getKey()) == 0) {
+ yetToReadPartitions.add(entry.getKey());
+ continue;
+ }
+
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
getTimestampPerPartitionAtCurrentOffset(partitions);
+ // TODO: this might give wierd values for lag when the tasks are yet to
start processing
+ yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
+
+ recordSupplier.seekToLatest(partitions);
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
+ latestSequenceFromStream =
recordSupplier.getLatestSequenceNumbers(partitions);
+
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
latestSequenceFromStream.entrySet()) {
+ // if there are no messages .getEndOffset would return 0, but if there
are n msgs it would return n+1
+ // and hence we need to seek to n - 2 to get the nth msg in the next
poll.
+ if (entry.getValue() != 0) {
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 2);
+ }
+ }
+
+ partitionToTimeLag = getTimestampPerPartitionAtCurrentOffset(partitions)
+ .entrySet().stream().filter(e ->
lastIngestedTimestamps.containsKey(e.getKey()))
+ .collect(
+ Collectors.toMap(
+ Entry::getKey,
+ e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
+ )
+ );
Review Comment:
The older method used to end with `seekToLatest`, I wonder if we should do
that here too.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ private void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey())) {
Review Comment:
Rather than this, it would be more straightforward to iterate over the
contents of `partitionIds`.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java:
##########
@@ -51,6 +52,7 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
private final KafkaConfigOverrides configOverrides;
private final String topic;
private final String topicPattern;
+ private final boolean publishTimeLag;
Review Comment:
```suggestion
private final boolean emitTimeLagMetrics;
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ private void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey())) {
+ if (highestCurrentOffsets.get(entry.getKey()) == null ||
highestCurrentOffsets.get(entry.getKey()) == 0) {
+ yetToReadPartitions.add(entry.getKey());
+ continue;
+ }
+
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
getTimestampPerPartitionAtCurrentOffset(partitions);
+ // TODO: this might give wierd values for lag when the tasks are yet to
start processing
Review Comment:
Maybe remove the todo for now, it is enough to call out the problem.
```suggestion
// Note: this might give weird values for lag when the tasks are yet
to start processing
```
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ private void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
Review Comment:
Please add a comment before this line denoting what is being done here.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ private void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey())) {
+ if (highestCurrentOffsets.get(entry.getKey()) == null ||
highestCurrentOffsets.get(entry.getKey()) == 0) {
+ yetToReadPartitions.add(entry.getKey());
+ continue;
+ }
+
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
getTimestampPerPartitionAtCurrentOffset(partitions);
+ // TODO: this might give wierd values for lag when the tasks are yet to
start processing
+ yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
+
+ recordSupplier.seekToLatest(partitions);
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
+ latestSequenceFromStream =
recordSupplier.getLatestSequenceNumbers(partitions);
+
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
latestSequenceFromStream.entrySet()) {
+ // if there are no messages .getEndOffset would return 0, but if there
are n msgs it would return n+1
+ // and hence we need to seek to n - 2 to get the nth msg in the next
poll.
+ if (entry.getValue() != 0) {
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 2);
Review Comment:
I forget the details of the discussion, but I think we had come to the
conclusion that this should be -1, right?
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -380,6 +382,108 @@ public LagStats computeLagStats()
return computeLags(partitionRecordLag);
}
+ /**
+ * This method is similar to updatePartitionLagFromStream
+ * but also determines time lag. Once this method has been
+ * tested, we can remove the older one.
+ */
+ private void updatePartitionTimeAndRecordLagFromStream()
+ {
+ final Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+
+ getRecordSupplierLock().lock();
+ try {
+ Set<KafkaTopicPartition> partitionIds;
+ try {
+ partitionIds =
recordSupplier.getPartitionIds(getIoConfig().getStream());
+ }
+ catch (Exception e) {
+ log.warn("Could not fetch partitions for topic/stream [%s]",
getIoConfig().getStream());
+ throw new StreamException(e);
+ }
+
+ final Set<StreamPartition<KafkaTopicPartition>> partitions = partitionIds
+ .stream()
+ .map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
+ .collect(Collectors.toSet());
+
+ final Set<KafkaTopicPartition> yetToReadPartitions = new HashSet<>();
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
highestCurrentOffsets.entrySet()) {
+ if (partitionIds.contains(entry.getKey())) {
+ if (highestCurrentOffsets.get(entry.getKey()) == null ||
highestCurrentOffsets.get(entry.getKey()) == 0) {
+ yetToReadPartitions.add(entry.getKey());
+ continue;
+ }
+
+ recordSupplier.seek(new StreamPartition<>(getIoConfig().getStream(),
entry.getKey()), entry.getValue() - 1);
+ }
+ }
+
+ final Map<KafkaTopicPartition, Long> lastIngestedTimestamps =
getTimestampPerPartitionAtCurrentOffset(partitions);
+ // TODO: this might give wierd values for lag when the tasks are yet to
start processing
+ yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
+
+ recordSupplier.seekToLatest(partitions);
+ // this method isn't actually computing the lag, just fetching the
latests offsets from the stream. This is
+ // because we currently only have record lag for kafka, which can be
lazily computed by subtracting the highest
+ // task offsets from the latest offsets from the stream when it is needed
Review Comment:
this comment is not needed
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]