noob-se7en commented on code in PR #15641:
URL: https://github.com/apache/pinot/pull/15641#discussion_r2060572177
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -97,45 +100,40 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
- long offset;
try {
+ // Build the offset spec request for this partition
+ Map<TopicPartition, OffsetSpec> request = new HashMap<>();
if (offsetCriteria.isLargest()) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.latest());
} else if (offsetCriteria.isSmallest()) {
- offset =
-
_consumer.beginningOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.earliest());
} else if (offsetCriteria.isPeriod()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
- Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
- .get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is period and its value evaluates to null
hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts = Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else if (offsetCriteria.isTimestamp()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is timestamp and its value evaluates to
null hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts =
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else {
- throw new IllegalArgumentException("Unknown initial offset value " +
offsetCriteria);
+ throw new IllegalArgumentException("Unknown offset criteria: " +
offsetCriteria);
}
- return new LongMsgOffset(offset);
- } catch (TimeoutException e) {
+ // Query via AdminClient (thread-safe)
+ ListOffsetsResult result = _adminClient.listOffsets(request);
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
+ result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (offsets == null || offsets.isEmpty() ||
!offsets.containsKey(_topicPartition)
+ || offsets.get(_topicPartition).offset() < 0) {
+ // fetch endOffsets as fallback
+ request.put(_topicPartition, OffsetSpec.latest());
Review Comment:
I dont think we were doing fallback for case: `offsetCriteria.isSmallest()`
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1851,6 +1854,10 @@ private StreamPartitionMsgOffset
fetchStreamOffset(OffsetCriteria offsetCriteria
}
try {
return
_partitionMetadataProvider.fetchStreamPartitionOffset(offsetCriteria,
maxWaitTimeMs);
+ } catch (IllegalStateException ise) {
Review Comment:
Is this the correct exception type for consumer closed?
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -97,45 +100,40 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
- long offset;
try {
+ // Build the offset spec request for this partition
+ Map<TopicPartition, OffsetSpec> request = new HashMap<>();
if (offsetCriteria.isLargest()) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.latest());
} else if (offsetCriteria.isSmallest()) {
- offset =
-
_consumer.beginningOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.earliest());
} else if (offsetCriteria.isPeriod()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
- Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
- .get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is period and its value evaluates to null
hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts = Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else if (offsetCriteria.isTimestamp()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is timestamp and its value evaluates to
null hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts =
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else {
- throw new IllegalArgumentException("Unknown initial offset value " +
offsetCriteria);
+ throw new IllegalArgumentException("Unknown offset criteria: " +
offsetCriteria);
}
- return new LongMsgOffset(offset);
- } catch (TimeoutException e) {
+ // Query via AdminClient (thread-safe)
+ ListOffsetsResult result = _adminClient.listOffsets(request);
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
+ result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (offsets == null || offsets.isEmpty() ||
!offsets.containsKey(_topicPartition)
+ || offsets.get(_topicPartition).offset() < 0) {
+ // fetch endOffsets as fallback
Review Comment:
same as above
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -97,45 +100,40 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
- long offset;
try {
+ // Build the offset spec request for this partition
+ Map<TopicPartition, OffsetSpec> request = new HashMap<>();
if (offsetCriteria.isLargest()) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.latest());
} else if (offsetCriteria.isSmallest()) {
- offset =
-
_consumer.beginningOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.earliest());
} else if (offsetCriteria.isPeriod()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
- Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
- .get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is period and its value evaluates to null
hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts = Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else if (offsetCriteria.isTimestamp()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is timestamp and its value evaluates to
null hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts =
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else {
- throw new IllegalArgumentException("Unknown initial offset value " +
offsetCriteria);
+ throw new IllegalArgumentException("Unknown offset criteria: " +
offsetCriteria);
}
- return new LongMsgOffset(offset);
- } catch (TimeoutException e) {
+ // Query via AdminClient (thread-safe)
+ ListOffsetsResult result = _adminClient.listOffsets(request);
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
+ result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (offsets == null || offsets.isEmpty() ||
!offsets.containsKey(_topicPartition)
+ || offsets.get(_topicPartition).offset() < 0) {
+ // fetch endOffsets as fallback
Review Comment:
This might be a reduntant call if `offsetCriteria.isLargest()`
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -97,45 +100,40 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
- long offset;
try {
+ // Build the offset spec request for this partition
+ Map<TopicPartition, OffsetSpec> request = new HashMap<>();
if (offsetCriteria.isLargest()) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.latest());
} else if (offsetCriteria.isSmallest()) {
- offset =
-
_consumer.beginningOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
+ request.put(_topicPartition, OffsetSpec.earliest());
} else if (offsetCriteria.isPeriod()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
- Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
- .get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is period and its value evaluates to null
hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts = Clock.systemUTC().millis() -
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else if (offsetCriteria.isTimestamp()) {
- OffsetAndTimestamp offsetAndTimestamp =
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
- if (offsetAndTimestamp == null) {
- offset =
_consumer.endOffsets(Collections.singletonList(_topicPartition),
Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- LOGGER.warn(
- "initial offset type is timestamp and its value evaluates to
null hence proceeding with offset {} for "
- + "topic {} partition {}", offset, _topicPartition.topic(),
_topicPartition.partition());
- } else {
- offset = offsetAndTimestamp.offset();
- }
+ long ts =
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
+ request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
} else {
- throw new IllegalArgumentException("Unknown initial offset value " +
offsetCriteria);
+ throw new IllegalArgumentException("Unknown offset criteria: " +
offsetCriteria);
}
- return new LongMsgOffset(offset);
- } catch (TimeoutException e) {
+ // Query via AdminClient (thread-safe)
+ ListOffsetsResult result = _adminClient.listOffsets(request);
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
+ result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (offsets == null || offsets.isEmpty() ||
!offsets.containsKey(_topicPartition)
+ || offsets.get(_topicPartition).offset() < 0) {
+ // fetch endOffsets as fallback
+ request.put(_topicPartition, OffsetSpec.latest());
+ result = _adminClient.listOffsets(request);
+ offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+ LOGGER.warn(
+ "initial offset type is {} and its value evaluates to null hence
proceeding with offset {} " + "for "
+ + "topic {} partition {}", offsetCriteria,
offsets.get(_topicPartition).offset(),
+ _topicPartition.topic(), _topicPartition.partition());
+ }
+ ListOffsetsResult.ListOffsetsResultInfo info =
offsets.get(_topicPartition);
+ return new LongMsgOffset(info.offset());
Review Comment:
Is this NPE safe?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]