FrankChen021 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3234376992


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4377,6 +4577,179 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
     return Collections.emptyMap();
   }
 
+  /**
+   * Check if all partitions in a task group have reached their bounded end 
offsets.
+   * Used to determine if the task group completed successfully vs failed 
midway.
+   *
+   * @param groupId The task group ID to check
+   * @return true if all partitions in the group have reached their end 
offsets, false otherwise
+   */
+  private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+  {
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+      return false;
+    }
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> startOffsets =
+        convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers());
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+
+    // Check if all partitions have empty ranges
+    // For exclusive end offsets (Kafka): start >= end means empty
+    // For inclusive end offsets (Kinesis): only start > end means empty 
(start == end is one record)
+    boolean allPartitionsEmptyRange = true;
+    for (PartitionIdType partition : partitionsInGroup) {
+      SequenceOffsetType start = startOffsets.get(partition);
+      SequenceOffsetType end = endOffsets.get(partition);
+
+      boolean isEmpty;
+      if (isEndOffsetExclusive()) {
+        // Exclusive: empty if start >= end
+        isEmpty = isOffsetAtOrBeyond(start, end);

Review Comment:
   [P2] Reject reversed bounded ranges instead of completing
   
   For exclusive streams, this treats `start > end` the same as the valid empty 
case `start == end`, so a config like `start=500/end=100` makes every partition 
look complete and the supervisor can move to COMPLETED without creating a task. 
`KafkaIndexTaskIOConfig` would reject `end < start` if a task were created, but 
this completion check bypasses that validation. Reversed ranges should fail 
validation instead of being reported as successful bounded ingestion.



##########
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java:
##########
@@ -4725,6 +4730,201 @@ public void test_doesTaskMatchSupervisor()
     Assert.assertFalse(supervisor.doesTaskMatchSupervisor(differentTaskType));
   }
 
+  @Test
+  public void testBoundedModeCreateTasksWithCorrectOffsets()
+  {
+    Map<String, Object> startOffsets = ImmutableMap.of(
+        "shardId-000000000000", 
"49590338271490256608559692538361571095921575989136588898",
+        "shardId-000000000001", 
"49590338271512257353759162668991891722121171891717232706"
+    );
+    Map<String, Object> endOffsets = ImmutableMap.of(
+        "shardId-000000000000", 
"49590338271534258098958632799622211348320767794297876514",
+        "shardId-000000000001", 
"49590338271556258844158102930252531974520363696878520322"
+    );
+    final KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new 
KinesisSupervisorIOConfig(
+        STREAM,
+        INPUT_FORMAT,
+        "awsEndpoint",
+        null,
+        1,
+        1,
+        new Period("PT30S"),
+        null,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        0,
+        null,
+        null,
+        null,
+        true,
+        null,
+        new BoundedStreamConfig(startOffsets, endOffsets)
+    );
+
+    Assert.assertTrue(kinesisSupervisorIOConfig.isBounded());
+
+    final KinesisIndexTaskClientFactory taskClientFactory = new 
KinesisIndexTaskClientFactory(null, null);
+    final KinesisSupervisorSpec spec = new KinesisSupervisorSpec(
+        null,
+        null,
+        dataSchema,
+        KinesisSupervisorTuningConfig.defaultConfig(),
+        kinesisSupervisorIOConfig,
+        null,
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        OBJECT_MAPPER,
+        new NoopServiceEmitter(),
+        new DruidMonitorSchedulerConfig(),
+        rowIngestionMetersFactory,
+        null,
+        new SupervisorStateManagerConfig()
+    );
+
+    supervisor = new TestableKinesisSupervisor(
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        OBJECT_MAPPER,
+        spec,
+        rowIngestionMetersFactory
+    );
+
+    // Test type conversion methods
+    String shardId = 
supervisor.createPartitionIdFromString("shardId-000000000000");
+    Assert.assertEquals("shardId-000000000000", shardId);
+
+    String offset = 
supervisor.createSequenceOffsetFromObject("49590338271490256608559692538361571095921575989136588898");
+    
Assert.assertEquals("49590338271490256608559692538361571095921575989136588898", 
offset);
+
+    offset = supervisor.createSequenceOffsetFromObject(100);
+    Assert.assertEquals("100", offset);
+
+    Assert.assertTrue(supervisor.isOffsetAtOrBeyond(
+        "49590338271512257353759162668991891722121171891717232706",
+        "49590338271490256608559692538361571095921575989136588898"
+    ));
+    Assert.assertTrue(supervisor.isOffsetAtOrBeyond(
+        "49590338271490256608559692538361571095921575989136588898",
+        "49590338271490256608559692538361571095921575989136588898"
+    ));
+    Assert.assertFalse(supervisor.isOffsetAtOrBeyond(
+        "49590338271490256608559692538361571095921575989136588898",
+        "49590338271512257353759162668991891722121171891717232706"
+    ));
+  }
+
+  @Test
+  public void testIsOffsetAtOrBeyond_invalidSequenceNumber()
+  {
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+
+    Exception e = Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> supervisor.isOffsetAtOrBeyond("not-a-number", "12345")
+    );
+    Assert.assertTrue(e.getMessage().contains("Invalid Kinesis sequence 
number"));

Review Comment:
   [P1] Fix invalid sequence assertion
   
   This new assertion expects the exception message to contain `Invalid Kinesis 
sequence number`, but `KinesisSupervisor.isOffsetAtOrBeyond` delegates to 
`KinesisSequenceNumber.of`, which constructs a `BigInteger` and throws 
`NumberFormatException` with a different message for `not-a-number`. As 
written, this test should fail; either wrap invalid sequence numbers with the 
expected message in the production method or relax the assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to