AmatyaAvadhanula commented on code in PR #12792:
URL: https://github.com/apache/druid/pull/12792#discussion_r930636700


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java:
##########
@@ -93,21 +101,57 @@ public static boolean isValidAWSKinesisSequence(String 
sequenceNumber)
     return !(END_OF_SHARD_MARKER.equals(sequenceNumber)
              || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)
              || EXPIRED_MARKER.equals(sequenceNumber)
+             || UNREAD_TRIM_HORIZON.equals(sequenceNumber)
+             || UNREAD_LATEST.equals(sequenceNumber)
       );
   }
 
   @Override
   public int compareTo(OrderedSequenceNumber<String> o)
   {
     KinesisSequenceNumber num = (KinesisSequenceNumber) o;
+    if (isUnread() && num.isUnread()) {
+      return 0;
+    } else if (isUnread()) {
+      return -1;
+    } else if (num.isUnread()) {
+      return 1;
+    }
     if (isMaxSequenceNumber && num.isMaxSequenceNumber) {
       return 0;
     } else if (isMaxSequenceNumber) {
       return 1;
     } else if (num.isMaxSequenceNumber) {
       return -1;
-    } else {
-      return this.intSequence.compareTo(new BigInteger(o.get()));
     }
+    return this.intSequence.compareTo(new BigInteger(o.get()));
+  }
+
+  @Override
+  public boolean isAvailableWithEarliest(OrderedSequenceNumber<String> 
earliest)
+  {
+    if (isUnread()) {

Review Comment:
   > The sequence availability check ensures that the current offset is before 
the earliest available sequence in the shard. However, current token being an 
UNREAD token indicates that any sequence number in the shard is valid (despite 
the ordering)
   
   To elaborate on the description,
   Say we're reading from position 5 and the earliest available position in the 
shard is 3, we haven't fallen back.
   However if the earliest position were 10, it means that ingestion has 
fallend behind and the current offset is unavailable.
   UNREAD_LATEST and UNREAD_TRIM_HORIZON are "naturally" ordered to be less 
than any sequence number, and the availability check would fail with this 
ordering.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to