cecemei commented on code in PR #18750:
URL: https://github.com/apache/druid/pull/18750#discussion_r2587006387
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -172,15 +172,15 @@ protected
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
- Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+ Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences();
Review Comment:
maybe pass `OffsetSnapshot` as param so that what's used in the lag
calculation is consistent with line 183?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Internal data structures
// --------------------------------------------------------
+ protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+ {
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType>
currentOffsets;
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+ public OffsetSnapshot(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+ @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+ )
+ {
+ this.currentOffsets = toImmutableOffsetMap(currentOffsets);
Review Comment:
It’s not clear to me under what circumstances the inputs could be null.
Could you document the scenarios where this might occur?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Internal data structures
// --------------------------------------------------------
+ protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+ {
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType>
currentOffsets;
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+ public OffsetSnapshot(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+ @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+ )
+ {
+ this.currentOffsets = toImmutableOffsetMap(currentOffsets);
+ this.endOffsets = toImmutableOffsetMap(endOffsets);
+ }
+
+ private ImmutableMap<PartitionIdType, SequenceOffsetType>
toImmutableOffsetMap(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> input
+ )
+ {
+ if (input == null || input.isEmpty()) {
+ return ImmutableMap.of();
+ }
+
+ return input.entrySet().stream()
Review Comment:
nit: looks like you just want to remove null value from the map, in which
case `Maps.filterValues` could be cleaner.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -316,19 +321,21 @@ private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences
@Override
protected Map<KafkaTopicPartition, Long>
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
{
- if (latestSequenceFromStream == null || currentOffsets == null) {
+ Map<KafkaTopicPartition, Long> endOffsets =
offsetSnapshotRef.get().getEndOffsets();
+
+ if (endOffsets == null || currentOffsets == null) {
Review Comment:
same here, can this be null?
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,13 +295,17 @@ protected Map<KafkaTopicPartition, Long>
getPartitionTimeLag()
// suppress use of CollectionUtils.mapValues() since the valueMapper
function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while calculating cummulative lag for entire stream
- private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long>
currentOffsets)
+ private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences()
{
- if (latestSequenceFromStream == null) {
+ OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot =
offsetSnapshotRef.get();
+ Map<KafkaTopicPartition, Long> currentOffsets =
offsetSnapshot.getCurrentOffsets();
+ Map<KafkaTopicPartition, Long> endOffsets = offsetSnapshot.getEndOffsets();
+
+ if (endOffsets == null) {
Review Comment:
Can `offsetSnapshot.getEndOffsets()` be null? I thought you have already
replaced null with empty map?
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -172,15 +172,15 @@ protected
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long> creat
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
- Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+ Map<KafkaTopicPartition, Long> partitionLag =
getRecordLagPerPartitionInLatestSequences();
return new KafkaSupervisorReportPayload(
spec.getId(),
spec.getDataSchema().getDataSource(),
ioConfig.getStream(),
numPartitions,
ioConfig.getReplicas(),
ioConfig.getTaskDuration().getMillis() / 1000,
- includeOffsets ? latestSequenceFromStream : null,
+ includeOffsets ? offsetSnapshotRef.get().getEndOffsets() : null,
Review Comment:
We're replacing `latestSequenceFromStream` with `offsetSnapshotRef` right?
Maybe remove `latestSequenceFromStream` as class level variable?
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java:
##########
Review Comment:
Have you considered adding some more complicated test cases to
`KafkaSupervisorTest`? I'm not sure if it's feasible, thinking maybe we could
use recordSupplier to mock the end_offset, indexerMetadataStorageCoordinator to
mock current_offset, and test the lags the system emits. I'm not very familiar
with this part of code base, so could be wrong.
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -526,6 +539,12 @@ protected void updatePartitionLagFromStream()
latestSequenceFromStream =
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId,
recordSupplier::getPosition));
+
+ OffsetSnapshot<KafkaTopicPartition, Long> snapshot = new
OffsetSnapshot<>(
Review Comment:
IIUC, we might want to get the current offsets before trying to get the end
offset, to minimize the chance of current_offset > end_offset, i.e. move line
544 to 521. I would not call this an atomic update since they're two separate
processes, but may be worth call out this in the javadoc for `OffsetSnapshot`,
this ensures end_offset > current_offset, and and reduce the issue of negative
lag...
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Internal data structures
// --------------------------------------------------------
+ protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+ {
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType>
currentOffsets;
+ private final ImmutableMap<PartitionIdType, SequenceOffsetType> endOffsets;
+
+ public OffsetSnapshot(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+ @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+ )
+ {
+ this.currentOffsets = toImmutableOffsetMap(currentOffsets);
+ this.endOffsets = toImmutableOffsetMap(endOffsets);
+ }
+
+ private ImmutableMap<PartitionIdType, SequenceOffsetType>
toImmutableOffsetMap(
+ @Nullable Map<PartitionIdType, SequenceOffsetType> input
+ )
+ {
+ if (input == null || input.isEmpty()) {
+ return ImmutableMap.of();
+ }
+
+ return input.entrySet().stream()
+ .filter(e -> e.getValue() != null)
+ .collect(ImmutableMap.toImmutableMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue
+ ));
+ }
+
+ public ImmutableMap<PartitionIdType, SequenceOffsetType>
getCurrentOffsets()
+ {
+ return currentOffsets;
+ }
+
+ public ImmutableMap<PartitionIdType, SequenceOffsetType> getEndOffsets()
+ {
+ return endOffsets;
+ }
+ }
+
+ protected final AtomicReference<OffsetSnapshot<PartitionIdType,
SequenceOffsetType>> offsetSnapshotRef =
Review Comment:
Is there any specific reason why you decided to put `offsetSnapshotRef` in
`SeekableStreamSupervisor` instead of `KafkaSupervisor`? Maybe it's more
suitable there for the sake of encapsulation.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -177,6 +178,50 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// Internal data structures
// --------------------------------------------------------
+ protected static class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
Review Comment:
Worth adding some javadoc explaining the decision of putting current_offset
and end_offset in one class, how it might affect the lag metrics etc...
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -261,14 +261,15 @@ protected
List<SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEnt
@Override
protected Map<KafkaTopicPartition, Long> getPartitionRecordLag()
{
- Map<KafkaTopicPartition, Long> highestCurrentOffsets =
getHighestCurrentOffsets();
+ OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot =
offsetSnapshotRef.get();
+ Map<KafkaTopicPartition, Long> endOffsets = offsetSnapshot.getEndOffsets();
- if (latestSequenceFromStream == null) {
+ if (endOffsets == null) {
Review Comment:
same here, this be null?
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -294,13 +295,17 @@ protected Map<KafkaTopicPartition, Long>
getPartitionTimeLag()
// suppress use of CollectionUtils.mapValues() since the valueMapper
function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while calculating cummulative lag for entire stream
- private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long>
currentOffsets)
+ private Map<KafkaTopicPartition, Long>
getRecordLagPerPartitionInLatestSequences()
{
- if (latestSequenceFromStream == null) {
+ OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot =
offsetSnapshotRef.get();
+ Map<KafkaTopicPartition, Long> currentOffsets =
offsetSnapshot.getCurrentOffsets();
+ Map<KafkaTopicPartition, Long> endOffsets = offsetSnapshot.getEndOffsets();
+
+ if (endOffsets == null) {
return Collections.emptyMap();
}
- return latestSequenceFromStream
+ return endOffsets
Review Comment:
Can `e.getValue()` be null? I thought you have removed all null values from
the map. Also, we could use `currentOffsets.getOrDefault` instead of the
nullable stuff.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]