abhishekrb19 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3223501358


##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.error.DruidException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BoundedStreamConfigTest
+{
+  private final ObjectMapper mapper = new ObjectMapper();
+
+  @Test
+  public void testConstructorWithValidMaps()
+  {
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 100L);
+    startOffsets.put("1", 200L);
+
+    Map<String, Long> endOffsets = new HashMap<>();
+    endOffsets.put("0", 500L);
+    endOffsets.put("1", 600L);
+
+    BoundedStreamConfig config = new BoundedStreamConfig(startOffsets, 
endOffsets);
+
+    Assert.assertEquals(startOffsets, config.getStartSequenceNumbers());
+    Assert.assertEquals(endOffsets, config.getEndSequenceNumbers());
+  }
+
+  @Test
+  public void testConstructorWithNullStartSequenceNumbers()
+  {
+    Map<String, Long> endOffsets = new HashMap<>();
+    endOffsets.put("0", 500L);
+
+    DruidException ex = Assert.assertThrows(
+        DruidException.class,
+        () -> new BoundedStreamConfig(null, endOffsets)
+    );
+
+    Assert.assertTrue(ex.getMessage().contains("cannot be null or empty"));
+  }
+
+  @Test
+  public void testConstructorWithNullEndSequenceNumbers()
+  {
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 100L);
+
+    DruidException ex = Assert.assertThrows(
+        DruidException.class,
+        () -> new BoundedStreamConfig(startOffsets, null)
+    );
+
+    Assert.assertTrue(ex.getMessage().contains("cannot be null or empty"));
+  }
+
+  @Test
+  public void testConstructorWithEmptyStartSequenceNumbers()
+  {
+    Map<String, Long> startOffsets = new HashMap<>();
+    Map<String, Long> endOffsets = new HashMap<>();
+    endOffsets.put("0", 500L);
+
+    DruidException ex = Assert.assertThrows(
+        DruidException.class,
+        () -> new BoundedStreamConfig(startOffsets, endOffsets)
+    );
+
+    Assert.assertTrue(ex.getMessage().contains("cannot be null or empty"));
+  }
+
+  @Test
+  public void testConstructorWithEmptyEndSequenceNumbers()
+  {
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 100L);
+    Map<String, Long> endOffsets = new HashMap<>();
+
+    DruidException ex = Assert.assertThrows(
+        DruidException.class,
+        () -> new BoundedStreamConfig(startOffsets, endOffsets)
+    );
+
+    Assert.assertTrue(ex.getMessage().contains("cannot be null or empty"));
+  }
+
+  @Test
+  public void testConstructorWithMismatchedPartitions()
+  {
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 100L);
+    Map<String, Long> endOffsets = new HashMap<>();
+    endOffsets.put("1", 500L);
+
+    DruidException ex = Assert.assertThrows(
+        DruidException.class,
+        () -> new BoundedStreamConfig(startOffsets, endOffsets)
+    );
+
+    Assert.assertTrue(ex.getMessage().contains("must have matching partition 
sets"));
+  }
+
+  @Test
+  public void testSerializationDeserialization() throws Exception
+  {
+    Map<String, Integer> startOffsets = new HashMap<>();
+    startOffsets.put("0", 100);
+    startOffsets.put("1", 200);
+
+    Map<String, Integer> endOffsets = new HashMap<>();
+    endOffsets.put("0", 500);
+    endOffsets.put("1", 600);
+
+    BoundedStreamConfig config = new BoundedStreamConfig(startOffsets, 
endOffsets);
+
+    String json = mapper.writeValueAsString(config);
+    BoundedStreamConfig deserialized = mapper.readValue(json, 
BoundedStreamConfig.class);
+
+    // Check sizes
+    Assert.assertEquals(2, deserialized.getStartSequenceNumbers().size());
+    Assert.assertEquals(2, deserialized.getEndSequenceNumbers().size());
+
+    // Check that deserialized maps contain expected values (keys will be 
Strings after deserialization)
+    Assert.assertEquals(100, deserialized.getStartSequenceNumbers().get("0"));
+    Assert.assertEquals(200, deserialized.getStartSequenceNumbers().get("1"));
+    Assert.assertEquals(500, deserialized.getEndSequenceNumbers().get("0"));
+    Assert.assertEquals(600, deserialized.getEndSequenceNumbers().get("1"));
+  }
+
+  @Test
+  public void testDeserializationWithIntegerValues() throws Exception
+  {
+    String json = "{"
+                  + "\"startSequenceNumbers\": {\"0\": 100, \"1\": 200},"
+                  + "\"endSequenceNumbers\": {\"0\": 500, \"1\": 600}"
+                  + "}";
+
+    BoundedStreamConfig config = mapper.readValue(json, 
BoundedStreamConfig.class);
+
+    Assert.assertNotNull(config.getStartSequenceNumbers());
+    Assert.assertNotNull(config.getEndSequenceNumbers());
+    Assert.assertEquals(2, config.getStartSequenceNumbers().size());
+    Assert.assertEquals(2, config.getEndSequenceNumbers().size());
+  }
+
+  @Test
+  public void testDeserializationWithStringValues() throws Exception
+  {
+    String json = "{"
+                  + "\"startSequenceNumbers\": {\"0\": \"100\", \"1\": 
\"200\"},"
+                  + "\"endSequenceNumbers\": {\"0\": \"500\", \"1\": \"600\"}"
+                  + "}";
+
+    BoundedStreamConfig config = mapper.readValue(json, 
BoundedStreamConfig.class);
+
+    Assert.assertNotNull(config.getStartSequenceNumbers());
+    Assert.assertNotNull(config.getEndSequenceNumbers());
+    Assert.assertEquals(2, config.getStartSequenceNumbers().size());
+    Assert.assertEquals(2, config.getEndSequenceNumbers().size());
+  }
+
+  @Test
+  public void testDeserializationWithMixedTypes() throws Exception
+  {
+    String json = "{"
+                  + "\"startSequenceNumbers\": {\"0\": 100, \"1\": \"200\"},"
+                  + "\"endSequenceNumbers\": {\"0\": 500, \"1\": \"600\"}"
+                  + "}";
+
+    BoundedStreamConfig config = mapper.readValue(json, 
BoundedStreamConfig.class);
+
+    Assert.assertNotNull(config.getStartSequenceNumbers());
+    Assert.assertNotNull(config.getEndSequenceNumbers());
+    Assert.assertEquals(2, config.getStartSequenceNumbers().size());
+    Assert.assertEquals(2, config.getEndSequenceNumbers().size());
+  }
+
+  @Test
+  public void testEquals_equalObjects()
+  {
+    Map<String, Long> start1 = new HashMap<>();
+    start1.put("0", 0L);
+    Map<String, Long> end1 = new HashMap<>();
+    end1.put("0", 100L);
+
+    Map<String, Long> start2 = new HashMap<>();
+    start2.put("0", 0L);
+    Map<String, Long> end2 = new HashMap<>();
+    end2.put("0", 100L);
+
+    BoundedStreamConfig config1 = new BoundedStreamConfig(start1, end1);
+    BoundedStreamConfig config2 = new BoundedStreamConfig(start2, end2);
+
+    Assert.assertEquals(config1, config2);
+    Assert.assertEquals(config1.hashCode(), config2.hashCode());
+  }
+
+  @Test
+  public void testEquals_nullObject()
+  {
+    Map<String, Long> start = new HashMap<>();
+    start.put("0", 0L);
+    Map<String, Long> end = new HashMap<>();
+    end.put("0", 100L);
+
+    BoundedStreamConfig config = new BoundedStreamConfig(start, end);
+
+    Assert.assertNotEquals(config, null);
+  }
+
+  @Test
+  public void testEquals_differentClass()
+  {
+    Map<String, Long> start = new HashMap<>();
+    start.put("0", 0L);
+    Map<String, Long> end = new HashMap<>();
+    end.put("0", 100L);
+
+    BoundedStreamConfig config = new BoundedStreamConfig(start, end);
+
+    Assert.assertNotEquals(config, "not a BoundedStreamConfig");
+  }
+
+  @Test
+  public void testEquals_differentStartOffsets()
+  {
+    Map<String, Long> start1 = new HashMap<>();
+    start1.put("0", 0L);
+    Map<String, Long> start2 = new HashMap<>();
+    start2.put("0", 10L);
+    Map<String, Long> end = new HashMap<>();
+    end.put("0", 100L);
+
+    BoundedStreamConfig config1 = new BoundedStreamConfig(start1, end);
+    BoundedStreamConfig config2 = new BoundedStreamConfig(start2, end);
+
+    Assert.assertNotEquals(config1, config2);
+  }
+
+  @Test
+  public void testEquals_differentEndOffsets()
+  {
+    Map<String, Long> start = new HashMap<>();
+    start.put("0", 0L);
+    Map<String, Long> end1 = new HashMap<>();
+    end1.put("0", 100L);
+    Map<String, Long> end2 = new HashMap<>();
+    end2.put("0", 200L);
+
+    BoundedStreamConfig config1 = new BoundedStreamConfig(start, end1);
+    BoundedStreamConfig config2 = new BoundedStreamConfig(start, end2);
+
+    Assert.assertNotEquals(config1, config2);
+  }
+
+  @Test
+  public void testHashCode_consistency()
+  {
+    Map<String, Long> start = new HashMap<>();
+    start.put("0", 0L);
+    Map<String, Long> end = new HashMap<>();
+    end.put("0", 100L);
+
+    BoundedStreamConfig config = new BoundedStreamConfig(start, end);
+
+    int hashCode1 = config.hashCode();
+    int hashCode2 = config.hashCode();
+
+    Assert.assertEquals(hashCode1, hashCode2);
+  }
+
+  @Test
+  public void testHashCode_equalObjectsSameHashCode()
+  {
+    Map<String, Long> start1 = new HashMap<>();
+    start1.put("0", 0L);
+    Map<String, Long> end1 = new HashMap<>();
+    end1.put("0", 100L);
+
+    Map<String, Long> start2 = new HashMap<>();
+    start2.put("0", 0L);
+    Map<String, Long> end2 = new HashMap<>();
+    end2.put("0", 100L);
+
+    BoundedStreamConfig config1 = new BoundedStreamConfig(start1, end1);
+    BoundedStreamConfig config2 = new BoundedStreamConfig(start2, end2);
+
+    Assert.assertEquals(config1.hashCode(), config2.hashCode());

Review Comment:
   Could replace it with:
   ```
   
EqualsVerifier.forClass(BoundedStreamConfig.class).withNonnullFields("startSequenceNumbers",
 "endSequenceNumbers").usingGetClass().verify();
   ```
   
   (Perhaps `EqualsVerifier` tests would replace some of the test permutations 
for equals/hashcode here)



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.testing.embedded.StreamIngestResource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for bounded Kafka supervisors (one-time ingestion with explicit 
start/end offsets).
+ */
+public class KafkaBoundedSupervisorTest extends StreamIndexTestBase
+{
+  private static final EmittingLogger log = new 
EmittingLogger(KafkaBoundedSupervisorTest.class);
+  private final KafkaResource kafkaServer = new KafkaResource();
+
+  @Override
+  protected StreamIngestResource<?> getStreamIngestResource()
+  {
+    return kafkaServer;
+  }
+
+  @Test
+  public void test_boundedSupervisor_ingestsDataAndCompletes()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    // Publish records before creating supervisor
+    final int totalRecords = publish1kRecords(topic, false);
+
+    // Get the current end offsets for all partitions
+    Map<String, Long> endOffsets = kafkaServer.getPartitionOffsets(topic);
+    Assertions.assertEquals(2, endOffsets.size(), "Should have 2 partitions");
+
+    // Create bounded config with start offset 0 and current end offsets
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 0L);
+    startOffsets.put("1", 0L);
+
+    BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, 
endOffsets);
+
+    // Create bounded supervisor
+    final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig
+    );
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Wait for records to be ingested
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor.getId());
+
+    // Verify row count
+    verifyRowCount(totalRecords);
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisor.getId());
+    Assertions.assertEquals("COMPLETED", status.getState());
+    Assertions.assertTrue(status.isHealthy());
+  }
+
+  @Test
+  public void test_boundedSupervisor_withEmptyRange_completesImmediately()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 1);
+
+    // Publish some records
+    publish1kRecords(topic, false);
+
+    // Get current offset
+    Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic);
+    Long currentOffset = currentOffsets.get("0");
+
+    // Create bounded config with start == end (empty range)
+    Map<String, Long> offsets = new HashMap<>();
+    offsets.put("0", currentOffset);
+
+    BoundedStreamConfig boundedConfig = new BoundedStreamConfig(offsets, 
offsets);
+
+    // Create bounded supervisor
+    final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig
+    );
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor.getId());
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisor.getId());
+    Assertions.assertEquals("COMPLETED", status.getState());
+  }
+
+  private KafkaSupervisorSpec createBoundedKafkaSupervisor(
+      KafkaResource kafkaServer,
+      String topic,
+      BoundedStreamConfig boundedConfig
+  )
+  {
+    return createKafkaSupervisor(kafkaServer)
+        .withIoConfig(io -> io
+            .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, 
null))
+            .withBoundedStreamConfig(boundedConfig)
+        )
+        .build(dataSource, topic);
+  }
+
+  @Test
+  public void test_boundedSupervisor_withMismatchedMetadata_is_unhealthy()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+    publish1kRecords(topic, false);
+
+    // Get the current end offsets for all partitions
+    Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic);
+    Assertions.assertEquals(2, currentOffsets.size(), "Should have 2 
partitions");
+
+    // Create first bounded config - ingest only the first 100 records from 
each partition
+    Map<String, Long> startOffsets1 = new HashMap<>();
+    startOffsets1.put("0", 0L);
+    startOffsets1.put("1", 0L);
+
+    Map<String, Long> endOffsets1 = new HashMap<>();
+    endOffsets1.put("0", 100L);
+    endOffsets1.put("1", 100L);
+
+    BoundedStreamConfig boundedConfig1 = new 
BoundedStreamConfig(startOffsets1, endOffsets1);
+
+    // Create first bounded supervisor and run it to completion
+    final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig1
+    );
+
+    cluster.callApi().postSupervisor(supervisor1);
+
+    // Wait for records to be ingested (approximately 200 records total from 
both partitions)
+    waitUntilPublishedRecordsAreIngested(200);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor1.getId());
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status1 = 
cluster.callApi().getSupervisorStatus(supervisor1.getId());
+    Assertions.assertEquals("COMPLETED", status1.getState());
+
+    // Now try to create a second bounded supervisor with different bounded 
config on the same datasource
+    Map<String, Long> startOffsets2 = new HashMap<>();
+    startOffsets2.put("0", 50L);  // Different start offset
+    startOffsets2.put("1", 50L);
+
+    Map<String, Long> endOffsets2 = new HashMap<>();
+    endOffsets2.put("0", 200L);  // Different end offset
+    endOffsets2.put("1", 200L);
+
+    BoundedStreamConfig boundedConfig2 = new 
BoundedStreamConfig(startOffsets2, endOffsets2);
+
+    final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig2
+    );
+
+    // Post the second supervisor (it should use the same supervisor 
ID/datasource)
+    cluster.callApi().postSupervisor(supervisor2);
+
+    // Wait for the supervisor to process and detect the metadata mismatch
+    // The exception we're testing for is thrown and logged, and causes the 
supervisor to become unhealthy
+    waitForSupervisorToBeUnhealthy(supervisor2.getId());
+
+    // Verify the supervisor is unhealthy
+    final SupervisorStatus status2 = 
cluster.callApi().getSupervisorStatus(supervisor2.getId());
+    Assertions.assertFalse(status2.isHealthy(), "Supervisor should be 
unhealthy after detecting metadata mismatch");
+    Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), 
"Supervisor state should be UNHEALTHY_SUPERVISOR");
+  }
+
+  /**
+   * Regression test: a new bounded run whose endOffset is less than the 
offset committed by a prior
+   * run must not silently reach COMPLETED. Before the fix, 
hasTaskGroupReachedBoundedEnd() compared
+   * the stale committed offset against the new endOffset (e.g. committed=100 
>= newEnd=50) and
+   * returned true, bypassing task creation and the documented mismatch error 
entirely.
+   */

Review Comment:
   Regression test or the javadoc can be entirely removed as these are new 
tests from this PR/self reviews itself:
   ```suggestion
   ```



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java:
##########
@@ -381,12 +387,54 @@ protected boolean isShardExpirationMarker(String seqNum)
     return KinesisSequenceNumber.EXPIRED_MARKER.equals(seqNum);
   }
 
+  @Override
+  protected boolean isOffsetAtOrBeyond(String current, String target)
+  {
+    // Kinesis sequence numbers are decimal numeric strings that must be 
compared numerically.
+    // Use BigInteger because Kinesis sequence numbers can be very large 
(128-bit).
+    try {
+      BigInteger currentNum = new BigInteger(current);
+      BigInteger targetNum = new BigInteger(target);
+      return currentNum.compareTo(targetNum) >= 0;
+    }
+    catch (NumberFormatException e) {
+      throw new IAE(
+          StringUtils.format(
+              "Invalid Kinesis sequence number. Expected numeric string but 
got current=[%s], target=[%s]",
+              current,
+              target
+          ),
+          e
+      );
+    }

Review Comment:
   Kinesis has some existing utilities for this which also accounts for some 
special handling of sequence numbers. Could probably be simplified to:
   ```
   return 
KinesisSequenceNumber.of(current).compareTo(KinesisSequenceNumber.of(target)) 
>= 0;
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4347,6 +4546,182 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
     return Collections.emptyMap();
   }
 
+  /**
+   * Check if all partitions in a task group have reached their bounded end 
offsets.
+   * Used to determine if the task group completed successfully vs failed 
midway.
+   *
+   * @param groupId The task group ID to check
+   * @return true if all partitions in the group have reached their end 
offsets, false otherwise
+   */
+  private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> startOffsets =
+        convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers());
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+      return false;
+    }
+
+    // Check if all partitions have empty ranges
+    // For exclusive end offsets (Kafka): start >= end means empty
+    // For inclusive end offsets (Kinesis): only start > end means empty 
(start == end is one record)
+    boolean allPartitionsEmptyRange = true;
+    for (PartitionIdType partition : partitionsInGroup) {
+      SequenceOffsetType start = startOffsets.get(partition);
+      SequenceOffsetType end = endOffsets.get(partition);
+
+      boolean isEmpty;
+      if (isEndOffsetExclusive()) {
+        // Exclusive: empty if start >= end
+        isEmpty = isOffsetAtOrBeyond(start, end);
+      } else {
+        // Inclusive: empty only if start > end
+        isEmpty = isOffsetAtOrBeyond(start, end) && !start.equals(end);
+      }
+
+      if (!isEmpty) {
+        allPartitionsEmptyRange = false;
+        break;
+      }
+    }
+
+    if (allPartitionsEmptyRange) {
+      log.warn(
+          "TaskGroup[%d] has empty range for all partitions (start %s end). "
+          + "No work to do, marking as complete. Start: %s, End: %s",
+          groupId,
+          isEndOffsetExclusive() ? ">=" : ">",
+          startOffsets,
+          endOffsets
+      );
+      return true;
+    }
+
+    // Offsets from a prior bounded run with a different config must not count 
as completion evidence.
+    // Returning false lets createNewTasks() run, which calls 
getOffsetFromStorageForPartition()
+    // and throws the documented mismatch error.
+    BoundedStreamConfig metadataBoundedConfig = 
getBoundedConfigFromMetadata(retrieveDataSourceMetadata());
+    if (!boundedConfig.equals(metadataBoundedConfig)) {
+      return false;
+    }
+    Map<PartitionIdType, SequenceOffsetType> currentOffsets = 
getOffsetsFromMetadataStorage();
+
+    log.info(
+        "Bounded mode: checking completion for taskGroup[%d]. Current offsets 
from metadata: %s, End offsets: %s",
+        groupId,
+        currentOffsets,
+        endOffsets
+    );
+
+    if (currentOffsets == null || currentOffsets.isEmpty()) {
+      log.debug("No checkpointed offsets found, taskGroup[%d] has not 
completed", groupId);
+      return false; // No progress yet, task hasn't completed
+    }
+
+    // Check if ALL partitions in this group have reached their end offsets
+    for (PartitionIdType partition : partitionsInGroup) {
+      SequenceOffsetType endOffset = endOffsets.get(partition);
+      SequenceOffsetType currentOffset = currentOffsets.get(partition);
+
+      if (currentOffset == null) {
+        log.debug(
+            "Partition[%s] in taskGroup[%d] has no checkpointed offset, not 
complete",
+            partition,
+            groupId
+        );
+        return false; // Partition hasn't started processing
+      }
+
+      if (!isOffsetAtOrBeyond(currentOffset, endOffset)) {
+        log.debug(
+            "Partition[%s] in taskGroup[%d] at offset[%s], has not reached 
end[%s]",
+            partition,
+            groupId,
+            currentOffset,
+            endOffset
+        );
+        return false; // This partition hasn't reached its end
+      }
+    }
+
+    log.info(
+        "All partitions in taskGroup[%d] have reached their end offsets",
+        groupId
+    );
+    return true; // All partitions have reached their end offsets
+  }
+
+  /**
+   * Get end offsets for all partitions in a task group from bounded config.
+   */
+  private Map<PartitionIdType, SequenceOffsetType> getEndOffsetsForGroup(int 
groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+
+    if (partitionsInGroup == null) {
+      return Collections.emptyMap();
+    }
+
+    return partitionsInGroup.stream()
+        .filter(endOffsets::containsKey)
+        .collect(Collectors.toMap(
+            p -> p,
+            endOffsets::get
+        ));
+  }
+
+  /**
+   * For bounded supervisors, we determine completion by checking if new tasks 
would be created.
+   * In createNewTasks(), bounded mode checks hasTaskGroupReachedBoundedEnd() 
before creating tasks.
+   * If that returns true (offsets reached), no new tasks are created.
+   * So completion is: no active tasks, no pending tasks, and createNewTasks() 
chose not to create any.
+   *
+   * @return true if all bounded work is complete, false otherwise

Review Comment:
   Thanks for clarifying the docs
   nit: for easier navigation between these javadoc functions:
   ```suggestion
     /**
      * For bounded supervisors, we determine completion by checking if new 
tasks would be created.
      * In {@link #createNewTasks()}, bounded mode checks {@link 
#hasTaskGroupReachedBoundedEnd(int)} before creating tasks.
      * If that returns true (offsets reached), no new tasks are created.
      * So completion is: no active tasks, no pending tasks, and {@link 
#createNewTasks()} chose not to create any.
      *
      * @return true if all bounded work is complete, false otherwise
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.testing.embedded.StreamIngestResource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for bounded Kafka supervisors (one-time ingestion with explicit 
start/end offsets).
+ */
+public class KafkaBoundedSupervisorTest extends StreamIndexTestBase
+{
+  private static final EmittingLogger log = new 
EmittingLogger(KafkaBoundedSupervisorTest.class);
+  private final KafkaResource kafkaServer = new KafkaResource();
+
+  @Override
+  protected StreamIngestResource<?> getStreamIngestResource()
+  {
+    return kafkaServer;
+  }
+
+  @Test
+  public void test_boundedSupervisor_ingestsDataAndCompletes()

Review Comment:
   Nice, thanks for these tests!



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4347,6 +4546,182 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
     return Collections.emptyMap();
   }
 
+  /**
+   * Check if all partitions in a task group have reached their bounded end 
offsets.
+   * Used to determine if the task group completed successfully vs failed 
midway.
+   *
+   * @param groupId The task group ID to check
+   * @return true if all partitions in the group have reached their end 
offsets, false otherwise
+   */
+  private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> startOffsets =
+        convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers());
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());

Review Comment:
   Actually, do you think it's worth resolving this once instead of computing 
it every time its invoked from the internal loop (given that it doesn't change)?



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.testing.embedded.StreamIngestResource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for bounded Kafka supervisors (one-time ingestion with explicit 
start/end offsets).
+ */
+public class KafkaBoundedSupervisorTest extends StreamIndexTestBase
+{
+  private static final EmittingLogger log = new 
EmittingLogger(KafkaBoundedSupervisorTest.class);
+  private final KafkaResource kafkaServer = new KafkaResource();
+
+  @Override
+  protected StreamIngestResource<?> getStreamIngestResource()
+  {
+    return kafkaServer;
+  }
+
+  @Test
+  public void test_boundedSupervisor_ingestsDataAndCompletes()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    // Publish records before creating supervisor
+    final int totalRecords = publish1kRecords(topic, false);
+
+    // Get the current end offsets for all partitions
+    Map<String, Long> endOffsets = kafkaServer.getPartitionOffsets(topic);
+    Assertions.assertEquals(2, endOffsets.size(), "Should have 2 partitions");
+
+    // Create bounded config with start offset 0 and current end offsets
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 0L);
+    startOffsets.put("1", 0L);
+
+    BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, 
endOffsets);
+
+    // Create bounded supervisor
+    final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig
+    );
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Wait for records to be ingested
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor.getId());
+
+    // Verify row count
+    verifyRowCount(totalRecords);
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisor.getId());
+    Assertions.assertEquals("COMPLETED", status.getState());
+    Assertions.assertTrue(status.isHealthy());
+  }
+
+  @Test
+  public void test_boundedSupervisor_withEmptyRange_completesImmediately()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 1);
+
+    // Publish some records
+    publish1kRecords(topic, false);
+
+    // Get current offset
+    Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic);
+    Long currentOffset = currentOffsets.get("0");
+
+    // Create bounded config with start == end (empty range)
+    Map<String, Long> offsets = new HashMap<>();
+    offsets.put("0", currentOffset);
+
+    BoundedStreamConfig boundedConfig = new BoundedStreamConfig(offsets, 
offsets);
+
+    // Create bounded supervisor
+    final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig
+    );
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor.getId());
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisor.getId());
+    Assertions.assertEquals("COMPLETED", status.getState());
+  }
+
+  private KafkaSupervisorSpec createBoundedKafkaSupervisor(
+      KafkaResource kafkaServer,
+      String topic,
+      BoundedStreamConfig boundedConfig
+  )
+  {
+    return createKafkaSupervisor(kafkaServer)
+        .withIoConfig(io -> io
+            .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, 
null))
+            .withBoundedStreamConfig(boundedConfig)
+        )
+        .build(dataSource, topic);
+  }
+
+  @Test
+  public void test_boundedSupervisor_withMismatchedMetadata_is_unhealthy()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+    publish1kRecords(topic, false);
+
+    // Get the current end offsets for all partitions
+    Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic);
+    Assertions.assertEquals(2, currentOffsets.size(), "Should have 2 
partitions");
+
+    // Create first bounded config - ingest only the first 100 records from 
each partition
+    Map<String, Long> startOffsets1 = new HashMap<>();
+    startOffsets1.put("0", 0L);
+    startOffsets1.put("1", 0L);
+
+    Map<String, Long> endOffsets1 = new HashMap<>();
+    endOffsets1.put("0", 100L);
+    endOffsets1.put("1", 100L);
+
+    BoundedStreamConfig boundedConfig1 = new 
BoundedStreamConfig(startOffsets1, endOffsets1);
+
+    // Create first bounded supervisor and run it to completion
+    final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig1
+    );
+
+    cluster.callApi().postSupervisor(supervisor1);
+
+    // Wait for records to be ingested (approximately 200 records total from 
both partitions)
+    waitUntilPublishedRecordsAreIngested(200);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor1.getId());
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status1 = 
cluster.callApi().getSupervisorStatus(supervisor1.getId());
+    Assertions.assertEquals("COMPLETED", status1.getState());
+
+    // Now try to create a second bounded supervisor with different bounded 
config on the same datasource
+    Map<String, Long> startOffsets2 = new HashMap<>();
+    startOffsets2.put("0", 50L);  // Different start offset
+    startOffsets2.put("1", 50L);
+
+    Map<String, Long> endOffsets2 = new HashMap<>();
+    endOffsets2.put("0", 200L);  // Different end offset
+    endOffsets2.put("1", 200L);
+
+    BoundedStreamConfig boundedConfig2 = new 
BoundedStreamConfig(startOffsets2, endOffsets2);
+
+    final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig2
+    );
+
+    // Post the second supervisor (it should use the same supervisor 
ID/datasource)
+    cluster.callApi().postSupervisor(supervisor2);
+
+    // Wait for the supervisor to process and detect the metadata mismatch
+    // The exception we're testing for is thrown and logged, and causes the 
supervisor to become unhealthy
+    waitForSupervisorToBeUnhealthy(supervisor2.getId());
+
+    // Verify the supervisor is unhealthy
+    final SupervisorStatus status2 = 
cluster.callApi().getSupervisorStatus(supervisor2.getId());
+    Assertions.assertFalse(status2.isHealthy(), "Supervisor should be 
unhealthy after detecting metadata mismatch");
+    Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), 
"Supervisor state should be UNHEALTHY_SUPERVISOR");

Review Comment:
   Perhaps also validate that the records ingested by this bounded supervisor 
is 0 and/or the tasks spun up by this supervisor actually failed?



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.testing.embedded.StreamIngestResource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for bounded Kafka supervisors (one-time ingestion with explicit 
start/end offsets).
+ */
+public class KafkaBoundedSupervisorTest extends StreamIndexTestBase
+{
+  private static final EmittingLogger log = new 
EmittingLogger(KafkaBoundedSupervisorTest.class);
+  private final KafkaResource kafkaServer = new KafkaResource();
+
+  @Override
+  protected StreamIngestResource<?> getStreamIngestResource()
+  {
+    return kafkaServer;
+  }
+
+  @Test
+  public void test_boundedSupervisor_ingestsDataAndCompletes()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    // Publish records before creating supervisor
+    final int totalRecords = publish1kRecords(topic, false);
+
+    // Get the current end offsets for all partitions
+    Map<String, Long> endOffsets = kafkaServer.getPartitionOffsets(topic);
+    Assertions.assertEquals(2, endOffsets.size(), "Should have 2 partitions");
+
+    // Create bounded config with start offset 0 and current end offsets
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 0L);
+    startOffsets.put("1", 0L);
+
+    BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, 
endOffsets);
+
+    // Create bounded supervisor
+    final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig
+    );
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Wait for records to be ingested
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor.getId());
+
+    // Verify row count
+    verifyRowCount(totalRecords);
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisor.getId());
+    Assertions.assertEquals("COMPLETED", status.getState());
+    Assertions.assertTrue(status.isHealthy());
+  }
+
+  @Test
+  public void test_boundedSupervisor_withEmptyRange_completesImmediately()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 1);
+
+    // Publish some records
+    publish1kRecords(topic, false);
+
+    // Get current offset
+    Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic);
+    Long currentOffset = currentOffsets.get("0");
+
+    // Create bounded config with start == end (empty range)
+    Map<String, Long> offsets = new HashMap<>();
+    offsets.put("0", currentOffset);
+
+    BoundedStreamConfig boundedConfig = new BoundedStreamConfig(offsets, 
offsets);
+
+    // Create bounded supervisor
+    final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig
+    );
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor.getId());
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisor.getId());
+    Assertions.assertEquals("COMPLETED", status.getState());
+  }
+
+  private KafkaSupervisorSpec createBoundedKafkaSupervisor(
+      KafkaResource kafkaServer,
+      String topic,
+      BoundedStreamConfig boundedConfig
+  )
+  {
+    return createKafkaSupervisor(kafkaServer)
+        .withIoConfig(io -> io
+            .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, 
null))
+            .withBoundedStreamConfig(boundedConfig)
+        )
+        .build(dataSource, topic);
+  }
+
+  @Test
+  public void test_boundedSupervisor_withMismatchedMetadata_is_unhealthy()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+    publish1kRecords(topic, false);
+
+    // Get the current end offsets for all partitions
+    Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic);
+    Assertions.assertEquals(2, currentOffsets.size(), "Should have 2 
partitions");
+
+    // Create first bounded config - ingest only the first 100 records from 
each partition
+    Map<String, Long> startOffsets1 = new HashMap<>();
+    startOffsets1.put("0", 0L);
+    startOffsets1.put("1", 0L);
+
+    Map<String, Long> endOffsets1 = new HashMap<>();
+    endOffsets1.put("0", 100L);
+    endOffsets1.put("1", 100L);
+
+    BoundedStreamConfig boundedConfig1 = new 
BoundedStreamConfig(startOffsets1, endOffsets1);
+
+    // Create first bounded supervisor and run it to completion
+    final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig1
+    );
+
+    cluster.callApi().postSupervisor(supervisor1);
+
+    // Wait for records to be ingested (approximately 200 records total from 
both partitions)
+    waitUntilPublishedRecordsAreIngested(200);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor1.getId());
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status1 = 
cluster.callApi().getSupervisorStatus(supervisor1.getId());
+    Assertions.assertEquals("COMPLETED", status1.getState());
+
+    // Now try to create a second bounded supervisor with different bounded 
config on the same datasource
+    Map<String, Long> startOffsets2 = new HashMap<>();
+    startOffsets2.put("0", 50L);  // Different start offset
+    startOffsets2.put("1", 50L);
+
+    Map<String, Long> endOffsets2 = new HashMap<>();
+    endOffsets2.put("0", 200L);  // Different end offset
+    endOffsets2.put("1", 200L);
+
+    BoundedStreamConfig boundedConfig2 = new 
BoundedStreamConfig(startOffsets2, endOffsets2);
+
+    final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig2
+    );
+
+    // Post the second supervisor (it should use the same supervisor 
ID/datasource)
+    cluster.callApi().postSupervisor(supervisor2);
+
+    // Wait for the supervisor to process and detect the metadata mismatch
+    // The exception we're testing for is thrown and logged, and causes the 
supervisor to become unhealthy
+    waitForSupervisorToBeUnhealthy(supervisor2.getId());
+
+    // Verify the supervisor is unhealthy
+    final SupervisorStatus status2 = 
cluster.callApi().getSupervisorStatus(supervisor2.getId());
+    Assertions.assertFalse(status2.isHealthy(), "Supervisor should be 
unhealthy after detecting metadata mismatch");
+    Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), 
"Supervisor state should be UNHEALTHY_SUPERVISOR");
+  }
+
+  /**
+   * Regression test: a new bounded run whose endOffset is less than the 
offset committed by a prior
+   * run must not silently reach COMPLETED. Before the fix, 
hasTaskGroupReachedBoundedEnd() compared
+   * the stale committed offset against the new endOffset (e.g. committed=100 
>= newEnd=50) and
+   * returned true, bypassing task creation and the documented mismatch error 
entirely.
+   */
+  @Test
+  public void 
test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceedsNewEnd()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+    publish1kRecords(topic, false);
+
+    // Run 1: ingest up to offset 100 on each partition and complete.
+    Map<String, Long> startOffsets1 = new HashMap<>();
+    startOffsets1.put("0", 0L);
+    startOffsets1.put("1", 0L);
+
+    Map<String, Long> endOffsets1 = new HashMap<>();
+    endOffsets1.put("0", 100L);
+    endOffsets1.put("1", 100L);

Review Comment:
   Just for completeness, maybe have unequal start / end offset marker for 
partitions 0 & 1 (in this test or the previous ones)?



##########
docs/ingestion/supervisor.md:
##########
@@ -251,6 +252,57 @@ Before you set `stopTaskCount`, note the following:
 - The [task autoscaler](#task-autoscaler) ignores `stopTaskCount` when 
shutting down tasks in response to a task count change. The task autoscaler 
needs to redistribute partitions across tasks, which requires all tasks to be 
shut down.
 - If you set `stopTaskCount` to a value less than `taskCount`, Druid cycles 
the longest running tasks first, then other tasks up to the value set.
 
+#### Bounded stream configuration
+
+Use `boundedStreamConfig` to configure one-time ingestion from a specific 
range of offsets. This is useful for backfilling historical data or 
reprocessing data with different configurations.
+
+The `boundedStreamConfig` object contains the following properties:
+
+|Property|Type|Description|Required|
+|--------|----|-----------|--------|
+|`startSequenceNumbers`|Object|Map of partition IDs to start offsets 
(inclusive for Kafka, inclusive for Kinesis).|Yes|
+|`endSequenceNumbers`|Object|Map of partition IDs to end offsets (exclusive 
for Kafka, inclusive for Kinesis).|Yes|
+
+When configured, the supervisor:
+1. Creates tasks that start reading from `startSequenceNumbers`
+2. Tasks automatically stop when they reach `endSequenceNumbers`
+3. Supervisor does not create replacement tasks after tasks complete
+4. Supervisor transitions to `COMPLETED` state and terminates when all tasks 
finish
+
+**Metadata consistency:** The bounded configuration is stored in datasource 
metadata along with checkpointed offsets. If you restart the supervisor or 
create a new supervisor with a different `boundedStreamConfig` for the same 
datasource, the supervisor will fail with an error. To start a new bounded 
ingestion with different offsets, either:
+- Use the [supervisor reset 
API](../api-reference/supervisor-api.md#reset-a-supervisor) to clear existing 
metadata
+- Use a different datasource name

Review Comment:
   Ditto: I think using a different datasource name would already be supported 
by the multi-supervisor feature (I think we meant different supervisor ID here?)



##########
docs/ingestion/supervisor.md:
##########
@@ -65,6 +65,7 @@ For configuration properties specific to Kafka and Kinesis, 
see [Kafka I/O confi
 |`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps earlier than this period before the task was created. 
For example, if this property is set to `PT1H` and the supervisor creates a 
task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than 
`2016-01-01T11:00Z`. This may help prevent concurrency issues if your data 
stream has late messages and you have multiple pipelines that need to operate 
on the same segments, such as a streaming and a nightly batch ingestion 
pipeline. You can specify only one of the late message rejection 
properties.|No||
 |`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps later than this period after the task reached its task 
duration. For example, if this property is set to `PT1H`, the task duration is 
set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid 
drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes 
run past their task duration, such as in cases of supervisor failover.|No||
 |`stopTaskCount`|Integer|Limits the number of ingestion tasks Druid can cycle 
at any given time. If not set, Druid can cycle all tasks at the same time. If 
set to a value less than `taskCount`, your cluster needs fewer available slots 
to run the supervisor. You can save costs by scaling down your ingestion tier, 
but this can lead to slower cycle times and lag. See 
[`stopTaskCount`](#stoptaskcount) for more information.|No|`taskCount` value|
+|`boundedStreamConfig`|Object|Configures the supervisor for bounded (one-time) 
ingestion with explicit start and end offsets. When set, the supervisor creates 
tasks that read from `startSequenceNumbers` to `endSequenceNumbers`, then 
automatically terminates when all data is ingested. The bounded configuration 
is stored with datasource metadata; if a supervisor is restarted or a new 
supervisor is created with different offsets for the same datasource, it will 
fail. To retry with different offsets, use the supervisor reset API to clear 
metadata or use a different datasource. Useful for backfills and historical 
reprocessing. See [Bounded stream configuration](#bounded-stream-configuration) 
for details.|No|null|

Review Comment:
   ```suggestion
   |`boundedStreamConfig`|Object|Configures the supervisor for bounded 
(one-time) ingestion with explicit start and end offsets. When set, the 
supervisor creates tasks that read from `startSequenceNumbers` to 
`endSequenceNumbers`, then automatically terminates when all data is ingested. 
The bounded configuration is stored with datasource metadata; if a supervisor 
is restarted or a new supervisor is created with different offsets for the same 
datasource, it will fail. To retry with different offsets, use the supervisor 
reset API to clear metadata or use a different supervisor ID. Useful for 
backfills and historical reprocessing. See [Bounded stream 
configuration](#bounded-stream-configuration) for details.|No|null|
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4263,18 +4412,51 @@ private Map<PartitionIdType, 
OrderedSequenceNumber<SequenceOffsetType>> generate
     return builder.build();
   }
 
+  /**
+   * Extracts BoundedStreamConfig from DataSourceMetadata if available.
+   */
+  @Nullable
+  private BoundedStreamConfig getBoundedConfigFromMetadata(@Nullable 
DataSourceMetadata metadata)
+  {
+    if (metadata instanceof SeekableStreamDataSourceMetadata) {
+      return ((SeekableStreamDataSourceMetadata<?, ?>) 
metadata).getBoundedStreamConfig();
+    }
+    return null;
+  }
+
   /**
    * Queries the dataSource metadata table to see if there is a previous 
ending sequence for this partition. If it
    * doesn't find any data, it will retrieve the latest or earliest 
Kafka/Kinesis sequence depending on the
    * {@link SeekableStreamSupervisorIOConfig#useEarliestSequenceNumber}.
    */
   private OrderedSequenceNumber<SequenceOffsetType> 
getOffsetFromStorageForPartition(
       PartitionIdType partition,
-      final Map<PartitionIdType, SequenceOffsetType> metadataOffsets
+      final Map<PartitionIdType, SequenceOffsetType> metadataOffsets,
+      @Nullable final BoundedStreamConfig metadataBoundedConfig
   )
   {
     SequenceOffsetType sequence = metadataOffsets.get(partition);
     if (sequence != null) {
+      // In bounded mode, check if the metadata's bounded config matches the 
current supervisor's config
+      if (ioConfig.isBounded()) {
+        BoundedStreamConfig currentBoundedConfig = 
ioConfig.getBoundedStreamConfig();
+
+        // If configs don't match (or metadata has no config), throw exception
+        if (!currentBoundedConfig.equals(metadataBoundedConfig)) {
+          throw DruidException.forPersona(DruidException.Persona.ADMIN)
+                              
.ofCategory(DruidException.Category.INVALID_INPUT)
+                              .build(
+                                  "Bounded supervisor detected existing 
metadata from a different run. "
+                                  + "Metadata bounded config [%s] does not 
match current config [%s]. "
+                                  + "To start a new bounded ingestion, either: 
"
+                                  + "(1) use the supervisor reset API to clear 
existing metadata, or "
+                                  + "(2) use a different supervisor ID / 
datasource.",

Review Comment:
   I think just a different supervisor ID would suffice for this backfill 
requirement? A different datasource would already be supported



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4347,6 +4546,182 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
     return Collections.emptyMap();
   }
 
+  /**
+   * Check if all partitions in a task group have reached their bounded end 
offsets.
+   * Used to determine if the task group completed successfully vs failed 
midway.
+   *
+   * @param groupId The task group ID to check
+   * @return true if all partitions in the group have reached their end 
offsets, false otherwise
+   */
+  private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> startOffsets =
+        convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers());
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+      return false;
+    }

Review Comment:
   Have an early return before the convert* methods are called?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4347,6 +4546,182 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
     return Collections.emptyMap();
   }
 
+  /**
+   * Check if all partitions in a task group have reached their bounded end 
offsets.
+   * Used to determine if the task group completed successfully vs failed 
midway.
+   *
+   * @param groupId The task group ID to check
+   * @return true if all partitions in the group have reached their end 
offsets, false otherwise
+   */
+  private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> startOffsets =
+        convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers());
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+      return false;
+    }
+
+    // Check if all partitions have empty ranges
+    // For exclusive end offsets (Kafka): start >= end means empty
+    // For inclusive end offsets (Kinesis): only start > end means empty 
(start == end is one record)
+    boolean allPartitionsEmptyRange = true;
+    for (PartitionIdType partition : partitionsInGroup) {
+      SequenceOffsetType start = startOffsets.get(partition);
+      SequenceOffsetType end = endOffsets.get(partition);
+
+      boolean isEmpty;
+      if (isEndOffsetExclusive()) {
+        // Exclusive: empty if start >= end
+        isEmpty = isOffsetAtOrBeyond(start, end);
+      } else {
+        // Inclusive: empty only if start > end
+        isEmpty = isOffsetAtOrBeyond(start, end) && !start.equals(end);
+      }
+
+      if (!isEmpty) {
+        allPartitionsEmptyRange = false;
+        break;
+      }
+    }
+
+    if (allPartitionsEmptyRange) {
+      log.warn(
+          "TaskGroup[%d] has empty range for all partitions (start %s end). "
+          + "No work to do, marking as complete. Start: %s, End: %s",
+          groupId,
+          isEndOffsetExclusive() ? ">=" : ">",
+          startOffsets,
+          endOffsets
+      );
+      return true;
+    }
+
+    // Offsets from a prior bounded run with a different config must not count 
as completion evidence.
+    // Returning false lets createNewTasks() run, which calls 
getOffsetFromStorageForPartition()
+    // and throws the documented mismatch error.

Review Comment:
   Not sure what this means without enough context: `// and throws the 
documented mismatch error.`



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.testing.embedded.StreamIngestResource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for bounded Kafka supervisors (one-time ingestion with explicit 
start/end offsets).
+ */
+public class KafkaBoundedSupervisorTest extends StreamIndexTestBase
+{
+  private static final EmittingLogger log = new 
EmittingLogger(KafkaBoundedSupervisorTest.class);
+  private final KafkaResource kafkaServer = new KafkaResource();
+
+  @Override
+  protected StreamIngestResource<?> getStreamIngestResource()
+  {
+    return kafkaServer;
+  }
+
+  @Test
+  public void test_boundedSupervisor_ingestsDataAndCompletes()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    // Publish records before creating supervisor
+    final int totalRecords = publish1kRecords(topic, false);
+
+    // Get the current end offsets for all partitions
+    Map<String, Long> endOffsets = kafkaServer.getPartitionOffsets(topic);
+    Assertions.assertEquals(2, endOffsets.size(), "Should have 2 partitions");
+
+    // Create bounded config with start offset 0 and current end offsets
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 0L);
+    startOffsets.put("1", 0L);
+
+    BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, 
endOffsets);
+
+    // Create bounded supervisor
+    final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig
+    );
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Wait for records to be ingested
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor.getId());
+
+    // Verify row count
+    verifyRowCount(totalRecords);
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisor.getId());
+    Assertions.assertEquals("COMPLETED", status.getState());
+    Assertions.assertTrue(status.isHealthy());
+  }
+
+  @Test
+  public void test_boundedSupervisor_withEmptyRange_completesImmediately()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 1);
+
+    // Publish some records
+    publish1kRecords(topic, false);
+
+    // Get current offset
+    Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic);
+    Long currentOffset = currentOffsets.get("0");
+
+    // Create bounded config with start == end (empty range)
+    Map<String, Long> offsets = new HashMap<>();
+    offsets.put("0", currentOffset);
+
+    BoundedStreamConfig boundedConfig = new BoundedStreamConfig(offsets, 
offsets);
+
+    // Create bounded supervisor
+    final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig
+    );
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor.getId());
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisor.getId());
+    Assertions.assertEquals("COMPLETED", status.getState());
+  }
+
+  private KafkaSupervisorSpec createBoundedKafkaSupervisor(
+      KafkaResource kafkaServer,
+      String topic,
+      BoundedStreamConfig boundedConfig
+  )
+  {
+    return createKafkaSupervisor(kafkaServer)
+        .withIoConfig(io -> io
+            .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, 
null))
+            .withBoundedStreamConfig(boundedConfig)
+        )
+        .build(dataSource, topic);
+  }
+
+  @Test
+  public void test_boundedSupervisor_withMismatchedMetadata_is_unhealthy()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+    publish1kRecords(topic, false);
+
+    // Get the current end offsets for all partitions
+    Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic);
+    Assertions.assertEquals(2, currentOffsets.size(), "Should have 2 
partitions");
+
+    // Create first bounded config - ingest only the first 100 records from 
each partition
+    Map<String, Long> startOffsets1 = new HashMap<>();
+    startOffsets1.put("0", 0L);
+    startOffsets1.put("1", 0L);
+
+    Map<String, Long> endOffsets1 = new HashMap<>();
+    endOffsets1.put("0", 100L);
+    endOffsets1.put("1", 100L);
+
+    BoundedStreamConfig boundedConfig1 = new 
BoundedStreamConfig(startOffsets1, endOffsets1);
+
+    // Create first bounded supervisor and run it to completion
+    final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig1
+    );
+
+    cluster.callApi().postSupervisor(supervisor1);
+
+    // Wait for records to be ingested (approximately 200 records total from 
both partitions)
+    waitUntilPublishedRecordsAreIngested(200);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor1.getId());
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status1 = 
cluster.callApi().getSupervisorStatus(supervisor1.getId());
+    Assertions.assertEquals("COMPLETED", status1.getState());
+
+    // Now try to create a second bounded supervisor with different bounded 
config on the same datasource
+    Map<String, Long> startOffsets2 = new HashMap<>();
+    startOffsets2.put("0", 50L);  // Different start offset
+    startOffsets2.put("1", 50L);
+
+    Map<String, Long> endOffsets2 = new HashMap<>();
+    endOffsets2.put("0", 200L);  // Different end offset
+    endOffsets2.put("1", 200L);
+
+    BoundedStreamConfig boundedConfig2 = new 
BoundedStreamConfig(startOffsets2, endOffsets2);
+
+    final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig2
+    );
+
+    // Post the second supervisor (it should use the same supervisor 
ID/datasource)
+    cluster.callApi().postSupervisor(supervisor2);
+
+    // Wait for the supervisor to process and detect the metadata mismatch
+    // The exception we're testing for is thrown and logged, and causes the 
supervisor to become unhealthy
+    waitForSupervisorToBeUnhealthy(supervisor2.getId());
+
+    // Verify the supervisor is unhealthy
+    final SupervisorStatus status2 = 
cluster.callApi().getSupervisorStatus(supervisor2.getId());
+    Assertions.assertFalse(status2.isHealthy(), "Supervisor should be 
unhealthy after detecting metadata mismatch");
+    Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), 
"Supervisor state should be UNHEALTHY_SUPERVISOR");
+  }
+
+  /**
+   * Regression test: a new bounded run whose endOffset is less than the 
offset committed by a prior
+   * run must not silently reach COMPLETED. Before the fix, 
hasTaskGroupReachedBoundedEnd() compared
+   * the stale committed offset against the new endOffset (e.g. committed=100 
>= newEnd=50) and
+   * returned true, bypassing task creation and the documented mismatch error 
entirely.
+   */
+  @Test
+  public void 
test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceedsNewEnd()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+    publish1kRecords(topic, false);
+
+    // Run 1: ingest up to offset 100 on each partition and complete.
+    Map<String, Long> startOffsets1 = new HashMap<>();
+    startOffsets1.put("0", 0L);
+    startOffsets1.put("1", 0L);
+
+    Map<String, Long> endOffsets1 = new HashMap<>();
+    endOffsets1.put("0", 100L);
+    endOffsets1.put("1", 100L);
+
+    BoundedStreamConfig boundedConfig1 = new 
BoundedStreamConfig(startOffsets1, endOffsets1);
+    final KafkaSupervisorSpec supervisor1 = 
createBoundedKafkaSupervisor(kafkaServer, topic, boundedConfig1);
+
+    cluster.callApi().postSupervisor(supervisor1);
+    waitUntilPublishedRecordsAreIngested(200);
+    waitForSupervisorToComplete(supervisor1.getId());
+
+    final SupervisorStatus status1 = 
cluster.callApi().getSupervisorStatus(supervisor1.getId());
+    Assertions.assertEquals("COMPLETED", status1.getState());
+
+    // Run 2: same datasource, endOffset (50) < stale committed offset (100).
+    // Without the fix the supervisor reaches COMPLETED immediately without 
running tasks.
+    // With the fix it detects the config mismatch and becomes 
UNHEALTHY_SUPERVISOR.
+    Map<String, Long> startOffsets2 = new HashMap<>();
+    startOffsets2.put("0", 0L);
+    startOffsets2.put("1", 0L);
+
+    Map<String, Long> endOffsets2 = new HashMap<>();
+    endOffsets2.put("0", 50L);
+    endOffsets2.put("1", 50L);
+
+    BoundedStreamConfig boundedConfig2 = new 
BoundedStreamConfig(startOffsets2, endOffsets2);
+    final KafkaSupervisorSpec supervisor2 = 
createBoundedKafkaSupervisor(kafkaServer, topic, boundedConfig2);
+
+    cluster.callApi().postSupervisor(supervisor2);
+    waitForSupervisorToBeUnhealthy(supervisor2.getId());
+
+    final SupervisorStatus status2 = 
cluster.callApi().getSupervisorStatus(supervisor2.getId());
+    Assertions.assertFalse(status2.isHealthy(), "Supervisor should be 
unhealthy after detecting metadata mismatch");

Review Comment:
   Do you think validating a third supervisor with a different ID but the same 
bounds would be worth adding? (either this test or the following one)



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4347,6 +4546,182 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
     return Collections.emptyMap();
   }
 
+  /**
+   * Check if all partitions in a task group have reached their bounded end 
offsets.
+   * Used to determine if the task group completed successfully vs failed 
midway.
+   *
+   * @param groupId The task group ID to check
+   * @return true if all partitions in the group have reached their end 
offsets, false otherwise
+   */
+  private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> startOffsets =
+        convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers());
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+      return false;
+    }
+
+    // Check if all partitions have empty ranges
+    // For exclusive end offsets (Kafka): start >= end means empty
+    // For inclusive end offsets (Kinesis): only start > end means empty 
(start == end is one record)
+    boolean allPartitionsEmptyRange = true;
+    for (PartitionIdType partition : partitionsInGroup) {
+      SequenceOffsetType start = startOffsets.get(partition);
+      SequenceOffsetType end = endOffsets.get(partition);

Review Comment:
   nit: could we make variables final here & in surrounding code where 
applicable so the intent is clear and nothing accidentally mutates these 
offsets (except `allPartitionsEmptyRange`)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to