abhishekrb19 commented on code in PR #19372: URL: https://github.com/apache/druid/pull/19372#discussion_r3223501358
########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java: ########## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class BoundedStreamConfigTest +{ + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + public void testConstructorWithValidMaps() + { + Map<String, Long> startOffsets = new HashMap<>(); + startOffsets.put("0", 100L); + startOffsets.put("1", 200L); + + Map<String, Long> endOffsets = new HashMap<>(); + endOffsets.put("0", 500L); + endOffsets.put("1", 600L); + + BoundedStreamConfig config = new BoundedStreamConfig(startOffsets, endOffsets); + + Assert.assertEquals(startOffsets, config.getStartSequenceNumbers()); + Assert.assertEquals(endOffsets, config.getEndSequenceNumbers()); + } + + @Test + public void testConstructorWithNullStartSequenceNumbers() + { + Map<String, Long> endOffsets = new HashMap<>(); + endOffsets.put("0", 500L); + + DruidException ex = Assert.assertThrows( + DruidException.class, + () -> new BoundedStreamConfig(null, endOffsets) + ); + + Assert.assertTrue(ex.getMessage().contains("cannot be null or empty")); + } + + @Test + public void testConstructorWithNullEndSequenceNumbers() + { + Map<String, Long> startOffsets = new HashMap<>(); + startOffsets.put("0", 100L); + + DruidException ex = Assert.assertThrows( + DruidException.class, + () -> new BoundedStreamConfig(startOffsets, null) + ); + + Assert.assertTrue(ex.getMessage().contains("cannot be null or empty")); + } + + @Test + public void testConstructorWithEmptyStartSequenceNumbers() + { + Map<String, Long> startOffsets = new HashMap<>(); + Map<String, Long> endOffsets = new HashMap<>(); + endOffsets.put("0", 500L); + + DruidException ex = Assert.assertThrows( + DruidException.class, + () -> new BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertTrue(ex.getMessage().contains("cannot be null or empty")); + } + + @Test + public void testConstructorWithEmptyEndSequenceNumbers() + { + Map<String, Long> startOffsets = new HashMap<>(); + startOffsets.put("0", 100L); + Map<String, Long> endOffsets = new HashMap<>(); + + DruidException ex = Assert.assertThrows( + DruidException.class, + () -> new BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertTrue(ex.getMessage().contains("cannot be null or empty")); + } + + @Test + public void testConstructorWithMismatchedPartitions() + { + Map<String, Long> startOffsets = new HashMap<>(); + startOffsets.put("0", 100L); + Map<String, Long> endOffsets = new HashMap<>(); + endOffsets.put("1", 500L); + + DruidException ex = Assert.assertThrows( + DruidException.class, + () -> new BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertTrue(ex.getMessage().contains("must have matching partition sets")); + } + + @Test + public void testSerializationDeserialization() throws Exception + { + Map<String, Integer> startOffsets = new HashMap<>(); + startOffsets.put("0", 100); + startOffsets.put("1", 200); + + Map<String, Integer> endOffsets = new HashMap<>(); + endOffsets.put("0", 500); + endOffsets.put("1", 600); + + BoundedStreamConfig config = new BoundedStreamConfig(startOffsets, endOffsets); + + String json = mapper.writeValueAsString(config); + BoundedStreamConfig deserialized = mapper.readValue(json, BoundedStreamConfig.class); + + // Check sizes + Assert.assertEquals(2, deserialized.getStartSequenceNumbers().size()); + Assert.assertEquals(2, deserialized.getEndSequenceNumbers().size()); + + // Check that deserialized maps contain expected values (keys will be Strings after deserialization) + Assert.assertEquals(100, deserialized.getStartSequenceNumbers().get("0")); + Assert.assertEquals(200, deserialized.getStartSequenceNumbers().get("1")); + Assert.assertEquals(500, deserialized.getEndSequenceNumbers().get("0")); + Assert.assertEquals(600, deserialized.getEndSequenceNumbers().get("1")); + } + + @Test + public void testDeserializationWithIntegerValues() throws Exception + { + String json = "{" + + "\"startSequenceNumbers\": {\"0\": 100, \"1\": 200}," + + "\"endSequenceNumbers\": {\"0\": 500, \"1\": 600}" + + "}"; + + BoundedStreamConfig config = mapper.readValue(json, BoundedStreamConfig.class); + + Assert.assertNotNull(config.getStartSequenceNumbers()); + Assert.assertNotNull(config.getEndSequenceNumbers()); + Assert.assertEquals(2, config.getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getEndSequenceNumbers().size()); + } + + @Test + public void testDeserializationWithStringValues() throws Exception + { + String json = "{" + + "\"startSequenceNumbers\": {\"0\": \"100\", \"1\": \"200\"}," + + "\"endSequenceNumbers\": {\"0\": \"500\", \"1\": \"600\"}" + + "}"; + + BoundedStreamConfig config = mapper.readValue(json, BoundedStreamConfig.class); + + Assert.assertNotNull(config.getStartSequenceNumbers()); + Assert.assertNotNull(config.getEndSequenceNumbers()); + Assert.assertEquals(2, config.getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getEndSequenceNumbers().size()); + } + + @Test + public void testDeserializationWithMixedTypes() throws Exception + { + String json = "{" + + "\"startSequenceNumbers\": {\"0\": 100, \"1\": \"200\"}," + + "\"endSequenceNumbers\": {\"0\": 500, \"1\": \"600\"}" + + "}"; + + BoundedStreamConfig config = mapper.readValue(json, BoundedStreamConfig.class); + + Assert.assertNotNull(config.getStartSequenceNumbers()); + Assert.assertNotNull(config.getEndSequenceNumbers()); + Assert.assertEquals(2, config.getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getEndSequenceNumbers().size()); + } + + @Test + public void testEquals_equalObjects() + { + Map<String, Long> start1 = new HashMap<>(); + start1.put("0", 0L); + Map<String, Long> end1 = new HashMap<>(); + end1.put("0", 100L); + + Map<String, Long> start2 = new HashMap<>(); + start2.put("0", 0L); + Map<String, Long> end2 = new HashMap<>(); + end2.put("0", 100L); + + BoundedStreamConfig config1 = new BoundedStreamConfig(start1, end1); + BoundedStreamConfig config2 = new BoundedStreamConfig(start2, end2); + + Assert.assertEquals(config1, config2); + Assert.assertEquals(config1.hashCode(), config2.hashCode()); + } + + @Test + public void testEquals_nullObject() + { + Map<String, Long> start = new HashMap<>(); + start.put("0", 0L); + Map<String, Long> end = new HashMap<>(); + end.put("0", 100L); + + BoundedStreamConfig config = new BoundedStreamConfig(start, end); + + Assert.assertNotEquals(config, null); + } + + @Test + public void testEquals_differentClass() + { + Map<String, Long> start = new HashMap<>(); + start.put("0", 0L); + Map<String, Long> end = new HashMap<>(); + end.put("0", 100L); + + BoundedStreamConfig config = new BoundedStreamConfig(start, end); + + Assert.assertNotEquals(config, "not a BoundedStreamConfig"); + } + + @Test + public void testEquals_differentStartOffsets() + { + Map<String, Long> start1 = new HashMap<>(); + start1.put("0", 0L); + Map<String, Long> start2 = new HashMap<>(); + start2.put("0", 10L); + Map<String, Long> end = new HashMap<>(); + end.put("0", 100L); + + BoundedStreamConfig config1 = new BoundedStreamConfig(start1, end); + BoundedStreamConfig config2 = new BoundedStreamConfig(start2, end); + + Assert.assertNotEquals(config1, config2); + } + + @Test + public void testEquals_differentEndOffsets() + { + Map<String, Long> start = new HashMap<>(); + start.put("0", 0L); + Map<String, Long> end1 = new HashMap<>(); + end1.put("0", 100L); + Map<String, Long> end2 = new HashMap<>(); + end2.put("0", 200L); + + BoundedStreamConfig config1 = new BoundedStreamConfig(start, end1); + BoundedStreamConfig config2 = new BoundedStreamConfig(start, end2); + + Assert.assertNotEquals(config1, config2); + } + + @Test + public void testHashCode_consistency() + { + Map<String, Long> start = new HashMap<>(); + start.put("0", 0L); + Map<String, Long> end = new HashMap<>(); + end.put("0", 100L); + + BoundedStreamConfig config = new BoundedStreamConfig(start, end); + + int hashCode1 = config.hashCode(); + int hashCode2 = config.hashCode(); + + Assert.assertEquals(hashCode1, hashCode2); + } + + @Test + public void testHashCode_equalObjectsSameHashCode() + { + Map<String, Long> start1 = new HashMap<>(); + start1.put("0", 0L); + Map<String, Long> end1 = new HashMap<>(); + end1.put("0", 100L); + + Map<String, Long> start2 = new HashMap<>(); + start2.put("0", 0L); + Map<String, Long> end2 = new HashMap<>(); + end2.put("0", 100L); + + BoundedStreamConfig config1 = new BoundedStreamConfig(start1, end1); + BoundedStreamConfig config2 = new BoundedStreamConfig(start2, end2); + + Assert.assertEquals(config1.hashCode(), config2.hashCode()); Review Comment: Could replace it with: ``` EqualsVerifier.forClass(BoundedStreamConfig.class).withNonnullFields("startSequenceNumbers", "endSequenceNumbers").usingGetClass().verify(); ``` (Perhaps `EqualsVerifier` tests would replace some of the test permutations for equals/hashcode here) ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.testing.embedded.StreamIngestResource; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for bounded Kafka supervisors (one-time ingestion with explicit start/end offsets). + */ +public class KafkaBoundedSupervisorTest extends StreamIndexTestBase +{ + private static final EmittingLogger log = new EmittingLogger(KafkaBoundedSupervisorTest.class); + private final KafkaResource kafkaServer = new KafkaResource(); + + @Override + protected StreamIngestResource<?> getStreamIngestResource() + { + return kafkaServer; + } + + @Test + public void test_boundedSupervisor_ingestsDataAndCompletes() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + + // Publish records before creating supervisor + final int totalRecords = publish1kRecords(topic, false); + + // Get the current end offsets for all partitions + Map<String, Long> endOffsets = kafkaServer.getPartitionOffsets(topic); + Assertions.assertEquals(2, endOffsets.size(), "Should have 2 partitions"); + + // Create bounded config with start offset 0 and current end offsets + Map<String, Long> startOffsets = new HashMap<>(); + startOffsets.put("0", 0L); + startOffsets.put("1", 0L); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + // Create bounded supervisor + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig + ); + + cluster.callApi().postSupervisor(supervisor); + + // Wait for records to be ingested + waitUntilPublishedRecordsAreIngested(totalRecords); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor.getId()); + + // Verify row count + verifyRowCount(totalRecords); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("COMPLETED", status.getState()); + Assertions.assertTrue(status.isHealthy()); + } + + @Test + public void test_boundedSupervisor_withEmptyRange_completesImmediately() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 1); + + // Publish some records + publish1kRecords(topic, false); + + // Get current offset + Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic); + Long currentOffset = currentOffsets.get("0"); + + // Create bounded config with start == end (empty range) + Map<String, Long> offsets = new HashMap<>(); + offsets.put("0", currentOffset); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(offsets, offsets); + + // Create bounded supervisor + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig + ); + + cluster.callApi().postSupervisor(supervisor); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor.getId()); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("COMPLETED", status.getState()); + } + + private KafkaSupervisorSpec createBoundedKafkaSupervisor( + KafkaResource kafkaServer, + String topic, + BoundedStreamConfig boundedConfig + ) + { + return createKafkaSupervisor(kafkaServer) + .withIoConfig(io -> io + .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, null)) + .withBoundedStreamConfig(boundedConfig) + ) + .build(dataSource, topic); + } + + @Test + public void test_boundedSupervisor_withMismatchedMetadata_is_unhealthy() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + publish1kRecords(topic, false); + + // Get the current end offsets for all partitions + Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic); + Assertions.assertEquals(2, currentOffsets.size(), "Should have 2 partitions"); + + // Create first bounded config - ingest only the first 100 records from each partition + Map<String, Long> startOffsets1 = new HashMap<>(); + startOffsets1.put("0", 0L); + startOffsets1.put("1", 0L); + + Map<String, Long> endOffsets1 = new HashMap<>(); + endOffsets1.put("0", 100L); + endOffsets1.put("1", 100L); + + BoundedStreamConfig boundedConfig1 = new BoundedStreamConfig(startOffsets1, endOffsets1); + + // Create first bounded supervisor and run it to completion + final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig1 + ); + + cluster.callApi().postSupervisor(supervisor1); + + // Wait for records to be ingested (approximately 200 records total from both partitions) + waitUntilPublishedRecordsAreIngested(200); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor1.getId()); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status1 = cluster.callApi().getSupervisorStatus(supervisor1.getId()); + Assertions.assertEquals("COMPLETED", status1.getState()); + + // Now try to create a second bounded supervisor with different bounded config on the same datasource + Map<String, Long> startOffsets2 = new HashMap<>(); + startOffsets2.put("0", 50L); // Different start offset + startOffsets2.put("1", 50L); + + Map<String, Long> endOffsets2 = new HashMap<>(); + endOffsets2.put("0", 200L); // Different end offset + endOffsets2.put("1", 200L); + + BoundedStreamConfig boundedConfig2 = new BoundedStreamConfig(startOffsets2, endOffsets2); + + final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig2 + ); + + // Post the second supervisor (it should use the same supervisor ID/datasource) + cluster.callApi().postSupervisor(supervisor2); + + // Wait for the supervisor to process and detect the metadata mismatch + // The exception we're testing for is thrown and logged, and causes the supervisor to become unhealthy + waitForSupervisorToBeUnhealthy(supervisor2.getId()); + + // Verify the supervisor is unhealthy + final SupervisorStatus status2 = cluster.callApi().getSupervisorStatus(supervisor2.getId()); + Assertions.assertFalse(status2.isHealthy(), "Supervisor should be unhealthy after detecting metadata mismatch"); + Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), "Supervisor state should be UNHEALTHY_SUPERVISOR"); + } + + /** + * Regression test: a new bounded run whose endOffset is less than the offset committed by a prior + * run must not silently reach COMPLETED. Before the fix, hasTaskGroupReachedBoundedEnd() compared + * the stale committed offset against the new endOffset (e.g. committed=100 >= newEnd=50) and + * returned true, bypassing task creation and the documented mismatch error entirely. + */ Review Comment: Regression test or the javadoc can be entirely removed as these are new tests from this PR/self reviews itself: ```suggestion ``` ########## extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java: ########## @@ -381,12 +387,54 @@ protected boolean isShardExpirationMarker(String seqNum) return KinesisSequenceNumber.EXPIRED_MARKER.equals(seqNum); } + @Override + protected boolean isOffsetAtOrBeyond(String current, String target) + { + // Kinesis sequence numbers are decimal numeric strings that must be compared numerically. + // Use BigInteger because Kinesis sequence numbers can be very large (128-bit). + try { + BigInteger currentNum = new BigInteger(current); + BigInteger targetNum = new BigInteger(target); + return currentNum.compareTo(targetNum) >= 0; + } + catch (NumberFormatException e) { + throw new IAE( + StringUtils.format( + "Invalid Kinesis sequence number. Expected numeric string but got current=[%s], target=[%s]", + current, + target + ), + e + ); + } Review Comment: Kinesis has some existing utilities for this which also accounts for some special handling of sequence numbers. Could probably be simplified to: ``` return KinesisSequenceNumber.of(current).compareTo(KinesisSequenceNumber.of(target)) >= 0; ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -4347,6 +4546,182 @@ && checkSourceMetadataMatch(dataSourceMetadata)) { return Collections.emptyMap(); } + /** + * Check if all partitions in a task group have reached their bounded end offsets. + * Used to determine if the task group completed successfully vs failed midway. + * + * @param groupId The task group ID to check + * @return true if all partitions in the group have reached their end offsets, false otherwise + */ + private boolean hasTaskGroupReachedBoundedEnd(int groupId) + { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map<PartitionIdType, SequenceOffsetType> startOffsets = + convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers()); + Map<PartitionIdType, SequenceOffsetType> endOffsets = + convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers()); + + Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId); + if (partitionsInGroup == null || partitionsInGroup.isEmpty()) { + return false; + } + + // Check if all partitions have empty ranges + // For exclusive end offsets (Kafka): start >= end means empty + // For inclusive end offsets (Kinesis): only start > end means empty (start == end is one record) + boolean allPartitionsEmptyRange = true; + for (PartitionIdType partition : partitionsInGroup) { + SequenceOffsetType start = startOffsets.get(partition); + SequenceOffsetType end = endOffsets.get(partition); + + boolean isEmpty; + if (isEndOffsetExclusive()) { + // Exclusive: empty if start >= end + isEmpty = isOffsetAtOrBeyond(start, end); + } else { + // Inclusive: empty only if start > end + isEmpty = isOffsetAtOrBeyond(start, end) && !start.equals(end); + } + + if (!isEmpty) { + allPartitionsEmptyRange = false; + break; + } + } + + if (allPartitionsEmptyRange) { + log.warn( + "TaskGroup[%d] has empty range for all partitions (start %s end). " + + "No work to do, marking as complete. Start: %s, End: %s", + groupId, + isEndOffsetExclusive() ? ">=" : ">", + startOffsets, + endOffsets + ); + return true; + } + + // Offsets from a prior bounded run with a different config must not count as completion evidence. + // Returning false lets createNewTasks() run, which calls getOffsetFromStorageForPartition() + // and throws the documented mismatch error. + BoundedStreamConfig metadataBoundedConfig = getBoundedConfigFromMetadata(retrieveDataSourceMetadata()); + if (!boundedConfig.equals(metadataBoundedConfig)) { + return false; + } + Map<PartitionIdType, SequenceOffsetType> currentOffsets = getOffsetsFromMetadataStorage(); + + log.info( + "Bounded mode: checking completion for taskGroup[%d]. Current offsets from metadata: %s, End offsets: %s", + groupId, + currentOffsets, + endOffsets + ); + + if (currentOffsets == null || currentOffsets.isEmpty()) { + log.debug("No checkpointed offsets found, taskGroup[%d] has not completed", groupId); + return false; // No progress yet, task hasn't completed + } + + // Check if ALL partitions in this group have reached their end offsets + for (PartitionIdType partition : partitionsInGroup) { + SequenceOffsetType endOffset = endOffsets.get(partition); + SequenceOffsetType currentOffset = currentOffsets.get(partition); + + if (currentOffset == null) { + log.debug( + "Partition[%s] in taskGroup[%d] has no checkpointed offset, not complete", + partition, + groupId + ); + return false; // Partition hasn't started processing + } + + if (!isOffsetAtOrBeyond(currentOffset, endOffset)) { + log.debug( + "Partition[%s] in taskGroup[%d] at offset[%s], has not reached end[%s]", + partition, + groupId, + currentOffset, + endOffset + ); + return false; // This partition hasn't reached its end + } + } + + log.info( + "All partitions in taskGroup[%d] have reached their end offsets", + groupId + ); + return true; // All partitions have reached their end offsets + } + + /** + * Get end offsets for all partitions in a task group from bounded config. + */ + private Map<PartitionIdType, SequenceOffsetType> getEndOffsetsForGroup(int groupId) + { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map<PartitionIdType, SequenceOffsetType> endOffsets = + convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers()); + Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId); + + if (partitionsInGroup == null) { + return Collections.emptyMap(); + } + + return partitionsInGroup.stream() + .filter(endOffsets::containsKey) + .collect(Collectors.toMap( + p -> p, + endOffsets::get + )); + } + + /** + * For bounded supervisors, we determine completion by checking if new tasks would be created. + * In createNewTasks(), bounded mode checks hasTaskGroupReachedBoundedEnd() before creating tasks. + * If that returns true (offsets reached), no new tasks are created. + * So completion is: no active tasks, no pending tasks, and createNewTasks() chose not to create any. + * + * @return true if all bounded work is complete, false otherwise Review Comment: Thanks for clarifying the docs nit: for easier navigation between these javadoc functions: ```suggestion /** * For bounded supervisors, we determine completion by checking if new tasks would be created. * In {@link #createNewTasks()}, bounded mode checks {@link #hasTaskGroupReachedBoundedEnd(int)} before creating tasks. * If that returns true (offsets reached), no new tasks are created. * So completion is: no active tasks, no pending tasks, and {@link #createNewTasks()} chose not to create any. * * @return true if all bounded work is complete, false otherwise ``` ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.testing.embedded.StreamIngestResource; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for bounded Kafka supervisors (one-time ingestion with explicit start/end offsets). + */ +public class KafkaBoundedSupervisorTest extends StreamIndexTestBase +{ + private static final EmittingLogger log = new EmittingLogger(KafkaBoundedSupervisorTest.class); + private final KafkaResource kafkaServer = new KafkaResource(); + + @Override + protected StreamIngestResource<?> getStreamIngestResource() + { + return kafkaServer; + } + + @Test + public void test_boundedSupervisor_ingestsDataAndCompletes() Review Comment: Nice, thanks for these tests! ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -4347,6 +4546,182 @@ && checkSourceMetadataMatch(dataSourceMetadata)) { return Collections.emptyMap(); } + /** + * Check if all partitions in a task group have reached their bounded end offsets. + * Used to determine if the task group completed successfully vs failed midway. + * + * @param groupId The task group ID to check + * @return true if all partitions in the group have reached their end offsets, false otherwise + */ + private boolean hasTaskGroupReachedBoundedEnd(int groupId) + { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map<PartitionIdType, SequenceOffsetType> startOffsets = + convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers()); + Map<PartitionIdType, SequenceOffsetType> endOffsets = + convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers()); Review Comment: Actually, do you think it's worth resolving this once instead of computing it every time its invoked from the internal loop (given that it doesn't change)? ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.testing.embedded.StreamIngestResource; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for bounded Kafka supervisors (one-time ingestion with explicit start/end offsets). + */ +public class KafkaBoundedSupervisorTest extends StreamIndexTestBase +{ + private static final EmittingLogger log = new EmittingLogger(KafkaBoundedSupervisorTest.class); + private final KafkaResource kafkaServer = new KafkaResource(); + + @Override + protected StreamIngestResource<?> getStreamIngestResource() + { + return kafkaServer; + } + + @Test + public void test_boundedSupervisor_ingestsDataAndCompletes() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + + // Publish records before creating supervisor + final int totalRecords = publish1kRecords(topic, false); + + // Get the current end offsets for all partitions + Map<String, Long> endOffsets = kafkaServer.getPartitionOffsets(topic); + Assertions.assertEquals(2, endOffsets.size(), "Should have 2 partitions"); + + // Create bounded config with start offset 0 and current end offsets + Map<String, Long> startOffsets = new HashMap<>(); + startOffsets.put("0", 0L); + startOffsets.put("1", 0L); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + // Create bounded supervisor + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig + ); + + cluster.callApi().postSupervisor(supervisor); + + // Wait for records to be ingested + waitUntilPublishedRecordsAreIngested(totalRecords); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor.getId()); + + // Verify row count + verifyRowCount(totalRecords); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("COMPLETED", status.getState()); + Assertions.assertTrue(status.isHealthy()); + } + + @Test + public void test_boundedSupervisor_withEmptyRange_completesImmediately() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 1); + + // Publish some records + publish1kRecords(topic, false); + + // Get current offset + Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic); + Long currentOffset = currentOffsets.get("0"); + + // Create bounded config with start == end (empty range) + Map<String, Long> offsets = new HashMap<>(); + offsets.put("0", currentOffset); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(offsets, offsets); + + // Create bounded supervisor + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig + ); + + cluster.callApi().postSupervisor(supervisor); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor.getId()); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("COMPLETED", status.getState()); + } + + private KafkaSupervisorSpec createBoundedKafkaSupervisor( + KafkaResource kafkaServer, + String topic, + BoundedStreamConfig boundedConfig + ) + { + return createKafkaSupervisor(kafkaServer) + .withIoConfig(io -> io + .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, null)) + .withBoundedStreamConfig(boundedConfig) + ) + .build(dataSource, topic); + } + + @Test + public void test_boundedSupervisor_withMismatchedMetadata_is_unhealthy() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + publish1kRecords(topic, false); + + // Get the current end offsets for all partitions + Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic); + Assertions.assertEquals(2, currentOffsets.size(), "Should have 2 partitions"); + + // Create first bounded config - ingest only the first 100 records from each partition + Map<String, Long> startOffsets1 = new HashMap<>(); + startOffsets1.put("0", 0L); + startOffsets1.put("1", 0L); + + Map<String, Long> endOffsets1 = new HashMap<>(); + endOffsets1.put("0", 100L); + endOffsets1.put("1", 100L); + + BoundedStreamConfig boundedConfig1 = new BoundedStreamConfig(startOffsets1, endOffsets1); + + // Create first bounded supervisor and run it to completion + final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig1 + ); + + cluster.callApi().postSupervisor(supervisor1); + + // Wait for records to be ingested (approximately 200 records total from both partitions) + waitUntilPublishedRecordsAreIngested(200); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor1.getId()); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status1 = cluster.callApi().getSupervisorStatus(supervisor1.getId()); + Assertions.assertEquals("COMPLETED", status1.getState()); + + // Now try to create a second bounded supervisor with different bounded config on the same datasource + Map<String, Long> startOffsets2 = new HashMap<>(); + startOffsets2.put("0", 50L); // Different start offset + startOffsets2.put("1", 50L); + + Map<String, Long> endOffsets2 = new HashMap<>(); + endOffsets2.put("0", 200L); // Different end offset + endOffsets2.put("1", 200L); + + BoundedStreamConfig boundedConfig2 = new BoundedStreamConfig(startOffsets2, endOffsets2); + + final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig2 + ); + + // Post the second supervisor (it should use the same supervisor ID/datasource) + cluster.callApi().postSupervisor(supervisor2); + + // Wait for the supervisor to process and detect the metadata mismatch + // The exception we're testing for is thrown and logged, and causes the supervisor to become unhealthy + waitForSupervisorToBeUnhealthy(supervisor2.getId()); + + // Verify the supervisor is unhealthy + final SupervisorStatus status2 = cluster.callApi().getSupervisorStatus(supervisor2.getId()); + Assertions.assertFalse(status2.isHealthy(), "Supervisor should be unhealthy after detecting metadata mismatch"); + Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), "Supervisor state should be UNHEALTHY_SUPERVISOR"); Review Comment: Perhaps also validate that the records ingested by this bounded supervisor is 0 and/or the tasks spun up by this supervisor actually failed? ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.testing.embedded.StreamIngestResource; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for bounded Kafka supervisors (one-time ingestion with explicit start/end offsets). + */ +public class KafkaBoundedSupervisorTest extends StreamIndexTestBase +{ + private static final EmittingLogger log = new EmittingLogger(KafkaBoundedSupervisorTest.class); + private final KafkaResource kafkaServer = new KafkaResource(); + + @Override + protected StreamIngestResource<?> getStreamIngestResource() + { + return kafkaServer; + } + + @Test + public void test_boundedSupervisor_ingestsDataAndCompletes() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + + // Publish records before creating supervisor + final int totalRecords = publish1kRecords(topic, false); + + // Get the current end offsets for all partitions + Map<String, Long> endOffsets = kafkaServer.getPartitionOffsets(topic); + Assertions.assertEquals(2, endOffsets.size(), "Should have 2 partitions"); + + // Create bounded config with start offset 0 and current end offsets + Map<String, Long> startOffsets = new HashMap<>(); + startOffsets.put("0", 0L); + startOffsets.put("1", 0L); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + // Create bounded supervisor + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig + ); + + cluster.callApi().postSupervisor(supervisor); + + // Wait for records to be ingested + waitUntilPublishedRecordsAreIngested(totalRecords); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor.getId()); + + // Verify row count + verifyRowCount(totalRecords); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("COMPLETED", status.getState()); + Assertions.assertTrue(status.isHealthy()); + } + + @Test + public void test_boundedSupervisor_withEmptyRange_completesImmediately() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 1); + + // Publish some records + publish1kRecords(topic, false); + + // Get current offset + Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic); + Long currentOffset = currentOffsets.get("0"); + + // Create bounded config with start == end (empty range) + Map<String, Long> offsets = new HashMap<>(); + offsets.put("0", currentOffset); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(offsets, offsets); + + // Create bounded supervisor + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig + ); + + cluster.callApi().postSupervisor(supervisor); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor.getId()); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("COMPLETED", status.getState()); + } + + private KafkaSupervisorSpec createBoundedKafkaSupervisor( + KafkaResource kafkaServer, + String topic, + BoundedStreamConfig boundedConfig + ) + { + return createKafkaSupervisor(kafkaServer) + .withIoConfig(io -> io + .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, null)) + .withBoundedStreamConfig(boundedConfig) + ) + .build(dataSource, topic); + } + + @Test + public void test_boundedSupervisor_withMismatchedMetadata_is_unhealthy() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + publish1kRecords(topic, false); + + // Get the current end offsets for all partitions + Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic); + Assertions.assertEquals(2, currentOffsets.size(), "Should have 2 partitions"); + + // Create first bounded config - ingest only the first 100 records from each partition + Map<String, Long> startOffsets1 = new HashMap<>(); + startOffsets1.put("0", 0L); + startOffsets1.put("1", 0L); + + Map<String, Long> endOffsets1 = new HashMap<>(); + endOffsets1.put("0", 100L); + endOffsets1.put("1", 100L); + + BoundedStreamConfig boundedConfig1 = new BoundedStreamConfig(startOffsets1, endOffsets1); + + // Create first bounded supervisor and run it to completion + final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig1 + ); + + cluster.callApi().postSupervisor(supervisor1); + + // Wait for records to be ingested (approximately 200 records total from both partitions) + waitUntilPublishedRecordsAreIngested(200); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor1.getId()); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status1 = cluster.callApi().getSupervisorStatus(supervisor1.getId()); + Assertions.assertEquals("COMPLETED", status1.getState()); + + // Now try to create a second bounded supervisor with different bounded config on the same datasource + Map<String, Long> startOffsets2 = new HashMap<>(); + startOffsets2.put("0", 50L); // Different start offset + startOffsets2.put("1", 50L); + + Map<String, Long> endOffsets2 = new HashMap<>(); + endOffsets2.put("0", 200L); // Different end offset + endOffsets2.put("1", 200L); + + BoundedStreamConfig boundedConfig2 = new BoundedStreamConfig(startOffsets2, endOffsets2); + + final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig2 + ); + + // Post the second supervisor (it should use the same supervisor ID/datasource) + cluster.callApi().postSupervisor(supervisor2); + + // Wait for the supervisor to process and detect the metadata mismatch + // The exception we're testing for is thrown and logged, and causes the supervisor to become unhealthy + waitForSupervisorToBeUnhealthy(supervisor2.getId()); + + // Verify the supervisor is unhealthy + final SupervisorStatus status2 = cluster.callApi().getSupervisorStatus(supervisor2.getId()); + Assertions.assertFalse(status2.isHealthy(), "Supervisor should be unhealthy after detecting metadata mismatch"); + Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), "Supervisor state should be UNHEALTHY_SUPERVISOR"); + } + + /** + * Regression test: a new bounded run whose endOffset is less than the offset committed by a prior + * run must not silently reach COMPLETED. Before the fix, hasTaskGroupReachedBoundedEnd() compared + * the stale committed offset against the new endOffset (e.g. committed=100 >= newEnd=50) and + * returned true, bypassing task creation and the documented mismatch error entirely. + */ + @Test + public void test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceedsNewEnd() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + publish1kRecords(topic, false); + + // Run 1: ingest up to offset 100 on each partition and complete. + Map<String, Long> startOffsets1 = new HashMap<>(); + startOffsets1.put("0", 0L); + startOffsets1.put("1", 0L); + + Map<String, Long> endOffsets1 = new HashMap<>(); + endOffsets1.put("0", 100L); + endOffsets1.put("1", 100L); Review Comment: Just for completeness, maybe have unequal start / end offset marker for partitions 0 & 1 (in this test or the previous ones)? ########## docs/ingestion/supervisor.md: ########## @@ -251,6 +252,57 @@ Before you set `stopTaskCount`, note the following: - The [task autoscaler](#task-autoscaler) ignores `stopTaskCount` when shutting down tasks in response to a task count change. The task autoscaler needs to redistribute partitions across tasks, which requires all tasks to be shut down. - If you set `stopTaskCount` to a value less than `taskCount`, Druid cycles the longest running tasks first, then other tasks up to the value set. +#### Bounded stream configuration + +Use `boundedStreamConfig` to configure one-time ingestion from a specific range of offsets. This is useful for backfilling historical data or reprocessing data with different configurations. + +The `boundedStreamConfig` object contains the following properties: + +|Property|Type|Description|Required| +|--------|----|-----------|--------| +|`startSequenceNumbers`|Object|Map of partition IDs to start offsets (inclusive for Kafka, inclusive for Kinesis).|Yes| +|`endSequenceNumbers`|Object|Map of partition IDs to end offsets (exclusive for Kafka, inclusive for Kinesis).|Yes| + +When configured, the supervisor: +1. Creates tasks that start reading from `startSequenceNumbers` +2. Tasks automatically stop when they reach `endSequenceNumbers` +3. Supervisor does not create replacement tasks after tasks complete +4. Supervisor transitions to `COMPLETED` state and terminates when all tasks finish + +**Metadata consistency:** The bounded configuration is stored in datasource metadata along with checkpointed offsets. If you restart the supervisor or create a new supervisor with a different `boundedStreamConfig` for the same datasource, the supervisor will fail with an error. To start a new bounded ingestion with different offsets, either: +- Use the [supervisor reset API](../api-reference/supervisor-api.md#reset-a-supervisor) to clear existing metadata +- Use a different datasource name Review Comment: Ditto: I think using a different datasource name would already be supported by the multi-supervisor feature (I think we meant different supervisor ID here?) ########## docs/ingestion/supervisor.md: ########## @@ -65,6 +65,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka I/O confi |`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No|| |`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover.|No|| |`stopTaskCount`|Integer|Limits the number of ingestion tasks Druid can cycle at any given time. If not set, Druid can cycle all tasks at the same time. If set to a value less than `taskCount`, your cluster needs fewer available slots to run the supervisor. You can save costs by scaling down your ingestion tier, but this can lead to slower cycle times and lag. See [`stopTaskCount`](#stoptaskcount) for more information.|No|`taskCount` value| +|`boundedStreamConfig`|Object|Configures the supervisor for bounded (one-time) ingestion with explicit start and end offsets. When set, the supervisor creates tasks that read from `startSequenceNumbers` to `endSequenceNumbers`, then automatically terminates when all data is ingested. The bounded configuration is stored with datasource metadata; if a supervisor is restarted or a new supervisor is created with different offsets for the same datasource, it will fail. To retry with different offsets, use the supervisor reset API to clear metadata or use a different datasource. Useful for backfills and historical reprocessing. See [Bounded stream configuration](#bounded-stream-configuration) for details.|No|null| Review Comment: ```suggestion |`boundedStreamConfig`|Object|Configures the supervisor for bounded (one-time) ingestion with explicit start and end offsets. When set, the supervisor creates tasks that read from `startSequenceNumbers` to `endSequenceNumbers`, then automatically terminates when all data is ingested. The bounded configuration is stored with datasource metadata; if a supervisor is restarted or a new supervisor is created with different offsets for the same datasource, it will fail. To retry with different offsets, use the supervisor reset API to clear metadata or use a different supervisor ID. Useful for backfills and historical reprocessing. See [Bounded stream configuration](#bounded-stream-configuration) for details.|No|null| ``` ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -4263,18 +4412,51 @@ private Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> generate return builder.build(); } + /** + * Extracts BoundedStreamConfig from DataSourceMetadata if available. + */ + @Nullable + private BoundedStreamConfig getBoundedConfigFromMetadata(@Nullable DataSourceMetadata metadata) + { + if (metadata instanceof SeekableStreamDataSourceMetadata) { + return ((SeekableStreamDataSourceMetadata<?, ?>) metadata).getBoundedStreamConfig(); + } + return null; + } + /** * Queries the dataSource metadata table to see if there is a previous ending sequence for this partition. If it * doesn't find any data, it will retrieve the latest or earliest Kafka/Kinesis sequence depending on the * {@link SeekableStreamSupervisorIOConfig#useEarliestSequenceNumber}. */ private OrderedSequenceNumber<SequenceOffsetType> getOffsetFromStorageForPartition( PartitionIdType partition, - final Map<PartitionIdType, SequenceOffsetType> metadataOffsets + final Map<PartitionIdType, SequenceOffsetType> metadataOffsets, + @Nullable final BoundedStreamConfig metadataBoundedConfig ) { SequenceOffsetType sequence = metadataOffsets.get(partition); if (sequence != null) { + // In bounded mode, check if the metadata's bounded config matches the current supervisor's config + if (ioConfig.isBounded()) { + BoundedStreamConfig currentBoundedConfig = ioConfig.getBoundedStreamConfig(); + + // If configs don't match (or metadata has no config), throw exception + if (!currentBoundedConfig.equals(metadataBoundedConfig)) { + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "Bounded supervisor detected existing metadata from a different run. " + + "Metadata bounded config [%s] does not match current config [%s]. " + + "To start a new bounded ingestion, either: " + + "(1) use the supervisor reset API to clear existing metadata, or " + + "(2) use a different supervisor ID / datasource.", Review Comment: I think just a different supervisor ID would suffice for this backfill requirement? A different datasource would already be supported ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -4347,6 +4546,182 @@ && checkSourceMetadataMatch(dataSourceMetadata)) { return Collections.emptyMap(); } + /** + * Check if all partitions in a task group have reached their bounded end offsets. + * Used to determine if the task group completed successfully vs failed midway. + * + * @param groupId The task group ID to check + * @return true if all partitions in the group have reached their end offsets, false otherwise + */ + private boolean hasTaskGroupReachedBoundedEnd(int groupId) + { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map<PartitionIdType, SequenceOffsetType> startOffsets = + convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers()); + Map<PartitionIdType, SequenceOffsetType> endOffsets = + convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers()); + + Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId); + if (partitionsInGroup == null || partitionsInGroup.isEmpty()) { + return false; + } Review Comment: Have an early return before the convert* methods are called? ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -4347,6 +4546,182 @@ && checkSourceMetadataMatch(dataSourceMetadata)) { return Collections.emptyMap(); } + /** + * Check if all partitions in a task group have reached their bounded end offsets. + * Used to determine if the task group completed successfully vs failed midway. + * + * @param groupId The task group ID to check + * @return true if all partitions in the group have reached their end offsets, false otherwise + */ + private boolean hasTaskGroupReachedBoundedEnd(int groupId) + { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map<PartitionIdType, SequenceOffsetType> startOffsets = + convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers()); + Map<PartitionIdType, SequenceOffsetType> endOffsets = + convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers()); + + Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId); + if (partitionsInGroup == null || partitionsInGroup.isEmpty()) { + return false; + } + + // Check if all partitions have empty ranges + // For exclusive end offsets (Kafka): start >= end means empty + // For inclusive end offsets (Kinesis): only start > end means empty (start == end is one record) + boolean allPartitionsEmptyRange = true; + for (PartitionIdType partition : partitionsInGroup) { + SequenceOffsetType start = startOffsets.get(partition); + SequenceOffsetType end = endOffsets.get(partition); + + boolean isEmpty; + if (isEndOffsetExclusive()) { + // Exclusive: empty if start >= end + isEmpty = isOffsetAtOrBeyond(start, end); + } else { + // Inclusive: empty only if start > end + isEmpty = isOffsetAtOrBeyond(start, end) && !start.equals(end); + } + + if (!isEmpty) { + allPartitionsEmptyRange = false; + break; + } + } + + if (allPartitionsEmptyRange) { + log.warn( + "TaskGroup[%d] has empty range for all partitions (start %s end). " + + "No work to do, marking as complete. Start: %s, End: %s", + groupId, + isEndOffsetExclusive() ? ">=" : ">", + startOffsets, + endOffsets + ); + return true; + } + + // Offsets from a prior bounded run with a different config must not count as completion evidence. + // Returning false lets createNewTasks() run, which calls getOffsetFromStorageForPartition() + // and throws the documented mismatch error. Review Comment: Not sure what this means without enough context: `// and throws the documented mismatch error.` ########## embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.indexing; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.indexing.kafka.simulate.KafkaResource; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.testing.embedded.StreamIngestResource; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Tests for bounded Kafka supervisors (one-time ingestion with explicit start/end offsets). + */ +public class KafkaBoundedSupervisorTest extends StreamIndexTestBase +{ + private static final EmittingLogger log = new EmittingLogger(KafkaBoundedSupervisorTest.class); + private final KafkaResource kafkaServer = new KafkaResource(); + + @Override + protected StreamIngestResource<?> getStreamIngestResource() + { + return kafkaServer; + } + + @Test + public void test_boundedSupervisor_ingestsDataAndCompletes() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + + // Publish records before creating supervisor + final int totalRecords = publish1kRecords(topic, false); + + // Get the current end offsets for all partitions + Map<String, Long> endOffsets = kafkaServer.getPartitionOffsets(topic); + Assertions.assertEquals(2, endOffsets.size(), "Should have 2 partitions"); + + // Create bounded config with start offset 0 and current end offsets + Map<String, Long> startOffsets = new HashMap<>(); + startOffsets.put("0", 0L); + startOffsets.put("1", 0L); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + // Create bounded supervisor + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig + ); + + cluster.callApi().postSupervisor(supervisor); + + // Wait for records to be ingested + waitUntilPublishedRecordsAreIngested(totalRecords); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor.getId()); + + // Verify row count + verifyRowCount(totalRecords); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("COMPLETED", status.getState()); + Assertions.assertTrue(status.isHealthy()); + } + + @Test + public void test_boundedSupervisor_withEmptyRange_completesImmediately() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 1); + + // Publish some records + publish1kRecords(topic, false); + + // Get current offset + Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic); + Long currentOffset = currentOffsets.get("0"); + + // Create bounded config with start == end (empty range) + Map<String, Long> offsets = new HashMap<>(); + offsets.put("0", currentOffset); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(offsets, offsets); + + // Create bounded supervisor + final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig + ); + + cluster.callApi().postSupervisor(supervisor); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor.getId()); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("COMPLETED", status.getState()); + } + + private KafkaSupervisorSpec createBoundedKafkaSupervisor( + KafkaResource kafkaServer, + String topic, + BoundedStreamConfig boundedConfig + ) + { + return createKafkaSupervisor(kafkaServer) + .withIoConfig(io -> io + .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, null)) + .withBoundedStreamConfig(boundedConfig) + ) + .build(dataSource, topic); + } + + @Test + public void test_boundedSupervisor_withMismatchedMetadata_is_unhealthy() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + publish1kRecords(topic, false); + + // Get the current end offsets for all partitions + Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic); + Assertions.assertEquals(2, currentOffsets.size(), "Should have 2 partitions"); + + // Create first bounded config - ingest only the first 100 records from each partition + Map<String, Long> startOffsets1 = new HashMap<>(); + startOffsets1.put("0", 0L); + startOffsets1.put("1", 0L); + + Map<String, Long> endOffsets1 = new HashMap<>(); + endOffsets1.put("0", 100L); + endOffsets1.put("1", 100L); + + BoundedStreamConfig boundedConfig1 = new BoundedStreamConfig(startOffsets1, endOffsets1); + + // Create first bounded supervisor and run it to completion + final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig1 + ); + + cluster.callApi().postSupervisor(supervisor1); + + // Wait for records to be ingested (approximately 200 records total from both partitions) + waitUntilPublishedRecordsAreIngested(200); + + // Wait for supervisor to transition to COMPLETED state + waitForSupervisorToComplete(supervisor1.getId()); + + // Verify supervisor is in COMPLETED state + final SupervisorStatus status1 = cluster.callApi().getSupervisorStatus(supervisor1.getId()); + Assertions.assertEquals("COMPLETED", status1.getState()); + + // Now try to create a second bounded supervisor with different bounded config on the same datasource + Map<String, Long> startOffsets2 = new HashMap<>(); + startOffsets2.put("0", 50L); // Different start offset + startOffsets2.put("1", 50L); + + Map<String, Long> endOffsets2 = new HashMap<>(); + endOffsets2.put("0", 200L); // Different end offset + endOffsets2.put("1", 200L); + + BoundedStreamConfig boundedConfig2 = new BoundedStreamConfig(startOffsets2, endOffsets2); + + final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor( + kafkaServer, + topic, + boundedConfig2 + ); + + // Post the second supervisor (it should use the same supervisor ID/datasource) + cluster.callApi().postSupervisor(supervisor2); + + // Wait for the supervisor to process and detect the metadata mismatch + // The exception we're testing for is thrown and logged, and causes the supervisor to become unhealthy + waitForSupervisorToBeUnhealthy(supervisor2.getId()); + + // Verify the supervisor is unhealthy + final SupervisorStatus status2 = cluster.callApi().getSupervisorStatus(supervisor2.getId()); + Assertions.assertFalse(status2.isHealthy(), "Supervisor should be unhealthy after detecting metadata mismatch"); + Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), "Supervisor state should be UNHEALTHY_SUPERVISOR"); + } + + /** + * Regression test: a new bounded run whose endOffset is less than the offset committed by a prior + * run must not silently reach COMPLETED. Before the fix, hasTaskGroupReachedBoundedEnd() compared + * the stale committed offset against the new endOffset (e.g. committed=100 >= newEnd=50) and + * returned true, bypassing task creation and the documented mismatch error entirely. + */ + @Test + public void test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceedsNewEnd() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + publish1kRecords(topic, false); + + // Run 1: ingest up to offset 100 on each partition and complete. + Map<String, Long> startOffsets1 = new HashMap<>(); + startOffsets1.put("0", 0L); + startOffsets1.put("1", 0L); + + Map<String, Long> endOffsets1 = new HashMap<>(); + endOffsets1.put("0", 100L); + endOffsets1.put("1", 100L); + + BoundedStreamConfig boundedConfig1 = new BoundedStreamConfig(startOffsets1, endOffsets1); + final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor(kafkaServer, topic, boundedConfig1); + + cluster.callApi().postSupervisor(supervisor1); + waitUntilPublishedRecordsAreIngested(200); + waitForSupervisorToComplete(supervisor1.getId()); + + final SupervisorStatus status1 = cluster.callApi().getSupervisorStatus(supervisor1.getId()); + Assertions.assertEquals("COMPLETED", status1.getState()); + + // Run 2: same datasource, endOffset (50) < stale committed offset (100). + // Without the fix the supervisor reaches COMPLETED immediately without running tasks. + // With the fix it detects the config mismatch and becomes UNHEALTHY_SUPERVISOR. + Map<String, Long> startOffsets2 = new HashMap<>(); + startOffsets2.put("0", 0L); + startOffsets2.put("1", 0L); + + Map<String, Long> endOffsets2 = new HashMap<>(); + endOffsets2.put("0", 50L); + endOffsets2.put("1", 50L); + + BoundedStreamConfig boundedConfig2 = new BoundedStreamConfig(startOffsets2, endOffsets2); + final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor(kafkaServer, topic, boundedConfig2); + + cluster.callApi().postSupervisor(supervisor2); + waitForSupervisorToBeUnhealthy(supervisor2.getId()); + + final SupervisorStatus status2 = cluster.callApi().getSupervisorStatus(supervisor2.getId()); + Assertions.assertFalse(status2.isHealthy(), "Supervisor should be unhealthy after detecting metadata mismatch"); Review Comment: Do you think validating a third supervisor with a different ID but the same bounds would be worth adding? (either this test or the following one) ########## indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java: ########## @@ -4347,6 +4546,182 @@ && checkSourceMetadataMatch(dataSourceMetadata)) { return Collections.emptyMap(); } + /** + * Check if all partitions in a task group have reached their bounded end offsets. + * Used to determine if the task group completed successfully vs failed midway. + * + * @param groupId The task group ID to check + * @return true if all partitions in the group have reached their end offsets, false otherwise + */ + private boolean hasTaskGroupReachedBoundedEnd(int groupId) + { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map<PartitionIdType, SequenceOffsetType> startOffsets = + convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers()); + Map<PartitionIdType, SequenceOffsetType> endOffsets = + convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers()); + + Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId); + if (partitionsInGroup == null || partitionsInGroup.isEmpty()) { + return false; + } + + // Check if all partitions have empty ranges + // For exclusive end offsets (Kafka): start >= end means empty + // For inclusive end offsets (Kinesis): only start > end means empty (start == end is one record) + boolean allPartitionsEmptyRange = true; + for (PartitionIdType partition : partitionsInGroup) { + SequenceOffsetType start = startOffsets.get(partition); + SequenceOffsetType end = endOffsets.get(partition); Review Comment: nit: could we make variables final here & in surrounding code where applicable so the intent is clear and nothing accidentally mutates these offsets (except `allPartitionsEmptyRange`)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
