FrankChen021 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3189385688


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4298,6 +4497,163 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
     return Collections.emptyMap();
   }
 
+  /**
+   * Check if all partitions in a task group have reached their bounded end 
offsets.
+   * Used to determine if the task group completed successfully vs failed 
midway.
+   *
+   * @param groupId The task group ID to check
+   * @return true if all partitions in the group have reached their end 
offsets, false otherwise
+   */
+  private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> startOffsets =
+        convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers());
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+      return false;
+    }
+
+    // Check if start >= end for all partitions (empty range)
+    // If so, there's no work to do - treat as already complete
+    boolean allPartitionsEmptyRange = true;
+    for (PartitionIdType partition : partitionsInGroup) {
+      SequenceOffsetType start = startOffsets.get(partition);
+      SequenceOffsetType end = endOffsets.get(partition);
+      if (!isOffsetAtOrBeyond(start, end)) {

Review Comment:
   [P1] Kinesis bounded ranges with start == end are skipped
   
   The new empty-range check treats start >= end as completed for all bounded 
supervisors before creating any task. That is valid for Kafka's exclusive end 
offsets, but Kinesis declares bounded end offsets as inclusive and its task 
runner returns isEndOffsetExclusive() == false, so a Kinesis bounded ingestion 
for a single record where startSequenceNumbers equals endSequenceNumbers is 
marked COMPLETED without reading that record. This should be provider-aware, 
for example only treating equality as empty when the end offset is exclusive, 
while still rejecting/handling start > end appropriately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to