AmatyaAvadhanula commented on code in PR #12792:
URL: https://github.com/apache/druid/pull/12792#discussion_r930636700
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java:
##########
@@ -93,21 +101,57 @@ public static boolean isValidAWSKinesisSequence(String
sequenceNumber)
return !(END_OF_SHARD_MARKER.equals(sequenceNumber)
|| NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)
|| EXPIRED_MARKER.equals(sequenceNumber)
+ || UNREAD_TRIM_HORIZON.equals(sequenceNumber)
+ || UNREAD_LATEST.equals(sequenceNumber)
);
}
@Override
public int compareTo(OrderedSequenceNumber<String> o)
{
KinesisSequenceNumber num = (KinesisSequenceNumber) o;
+ if (isUnread() && num.isUnread()) {
+ return 0;
+ } else if (isUnread()) {
+ return -1;
+ } else if (num.isUnread()) {
+ return 1;
+ }
if (isMaxSequenceNumber && num.isMaxSequenceNumber) {
return 0;
} else if (isMaxSequenceNumber) {
return 1;
} else if (num.isMaxSequenceNumber) {
return -1;
- } else {
- return this.intSequence.compareTo(new BigInteger(o.get()));
}
+ return this.intSequence.compareTo(new BigInteger(o.get()));
+ }
+
+ @Override
+ public boolean isAvailableWithEarliest(OrderedSequenceNumber<String>
earliest)
+ {
+ if (isUnread()) {
Review Comment:
> The sequence availability check ensures that the current offset is before
the earliest available sequence in the shard. However, current token being an
UNREAD token indicates that any sequence number in the shard is valid (despite
the ordering)
To elaborate on the description,
Say we're reading from position 5 and the earliest available position in the
shard is 3, we haven't fallen back.
However if the earliest position were 10, it means that ingestion has
fallend behind and the current offset is unavailable.
UNREAD_LATEST and UNREAD_TRIM_HORIZON are "naturally" ordered to be less
than any sequence number, and the availability check would fail with this
ordering.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]