FrankChen021 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3234376992
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4377,6 +4577,179 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
return Collections.emptyMap();
}
+ /**
+ * Check if all partitions in a task group have reached their bounded end
offsets.
+ * Used to determine if the task group completed successfully vs failed
midway.
+ *
+ * @param groupId The task group ID to check
+ * @return true if all partitions in the group have reached their end
offsets, false otherwise
+ */
+ private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+ {
+ Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+ if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+ return false;
+ }
+ BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+ Map<PartitionIdType, SequenceOffsetType> startOffsets =
+ convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers());
+ Map<PartitionIdType, SequenceOffsetType> endOffsets =
+ convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+
+ // Check if all partitions have empty ranges
+ // For exclusive end offsets (Kafka): start >= end means empty
+ // For inclusive end offsets (Kinesis): only start > end means empty
(start == end is one record)
+ boolean allPartitionsEmptyRange = true;
+ for (PartitionIdType partition : partitionsInGroup) {
+ SequenceOffsetType start = startOffsets.get(partition);
+ SequenceOffsetType end = endOffsets.get(partition);
+
+ boolean isEmpty;
+ if (isEndOffsetExclusive()) {
+ // Exclusive: empty if start >= end
+ isEmpty = isOffsetAtOrBeyond(start, end);
Review Comment:
[P2] Reject reversed bounded ranges instead of completing
For exclusive streams, this treats `start > end` the same as the valid empty
case `start == end`, so a config like `start=500/end=100` makes every partition
look complete and the supervisor can move to COMPLETED without creating a task.
`KafkaIndexTaskIOConfig` would reject `end < start` if a task were created, but
this completion check bypasses that validation. Reversed ranges should fail
validation instead of being reported as successful bounded ingestion.
##########
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java:
##########
@@ -4725,6 +4730,201 @@ public void test_doesTaskMatchSupervisor()
Assert.assertFalse(supervisor.doesTaskMatchSupervisor(differentTaskType));
}
+ @Test
+ public void testBoundedModeCreateTasksWithCorrectOffsets()
+ {
+ Map<String, Object> startOffsets = ImmutableMap.of(
+ "shardId-000000000000",
"49590338271490256608559692538361571095921575989136588898",
+ "shardId-000000000001",
"49590338271512257353759162668991891722121171891717232706"
+ );
+ Map<String, Object> endOffsets = ImmutableMap.of(
+ "shardId-000000000000",
"49590338271534258098958632799622211348320767794297876514",
+ "shardId-000000000001",
"49590338271556258844158102930252531974520363696878520322"
+ );
+ final KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new
KinesisSupervisorIOConfig(
+ STREAM,
+ INPUT_FORMAT,
+ "awsEndpoint",
+ null,
+ 1,
+ 1,
+ new Period("PT30S"),
+ null,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 0,
+ null,
+ null,
+ null,
+ true,
+ null,
+ new BoundedStreamConfig(startOffsets, endOffsets)
+ );
+
+ Assert.assertTrue(kinesisSupervisorIOConfig.isBounded());
+
+ final KinesisIndexTaskClientFactory taskClientFactory = new
KinesisIndexTaskClientFactory(null, null);
+ final KinesisSupervisorSpec spec = new KinesisSupervisorSpec(
+ null,
+ null,
+ dataSchema,
+ KinesisSupervisorTuningConfig.defaultConfig(),
+ kinesisSupervisorIOConfig,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ new NoopServiceEmitter(),
+ new DruidMonitorSchedulerConfig(),
+ rowIngestionMetersFactory,
+ null,
+ new SupervisorStateManagerConfig()
+ );
+
+ supervisor = new TestableKinesisSupervisor(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ spec,
+ rowIngestionMetersFactory
+ );
+
+ // Test type conversion methods
+ String shardId =
supervisor.createPartitionIdFromString("shardId-000000000000");
+ Assert.assertEquals("shardId-000000000000", shardId);
+
+ String offset =
supervisor.createSequenceOffsetFromObject("49590338271490256608559692538361571095921575989136588898");
+
Assert.assertEquals("49590338271490256608559692538361571095921575989136588898",
offset);
+
+ offset = supervisor.createSequenceOffsetFromObject(100);
+ Assert.assertEquals("100", offset);
+
+ Assert.assertTrue(supervisor.isOffsetAtOrBeyond(
+ "49590338271512257353759162668991891722121171891717232706",
+ "49590338271490256608559692538361571095921575989136588898"
+ ));
+ Assert.assertTrue(supervisor.isOffsetAtOrBeyond(
+ "49590338271490256608559692538361571095921575989136588898",
+ "49590338271490256608559692538361571095921575989136588898"
+ ));
+ Assert.assertFalse(supervisor.isOffsetAtOrBeyond(
+ "49590338271490256608559692538361571095921575989136588898",
+ "49590338271512257353759162668991891722121171891717232706"
+ ));
+ }
+
+ @Test
+ public void testIsOffsetAtOrBeyond_invalidSequenceNumber()
+ {
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+
+ Exception e = Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> supervisor.isOffsetAtOrBeyond("not-a-number", "12345")
+ );
+ Assert.assertTrue(e.getMessage().contains("Invalid Kinesis sequence
number"));
Review Comment:
[P1] Fix invalid sequence assertion
This new assertion expects the exception message to contain `Invalid Kinesis
sequence number`, but `KinesisSupervisor.isOffsetAtOrBeyond` delegates to
`KinesisSequenceNumber.of`, which constructs a `BigInteger` and throws
`NumberFormatException` with a different message for `not-a-number`. As
written, this test should fail; either wrap invalid sequence numbers with the
expected message in the production method or relax the assertion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]