This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1001b6f5d4b fix: ensure sequence metadata list accesses are threadsafe 
in task runner (#19467)
1001b6f5d4b is described below

commit 1001b6f5d4bff72876ba730a00f34e5b1bf32631
Author: jtuglu1 <[email protected]>
AuthorDate: Mon May 18 21:34:49 2026 -0700

    fix: ensure sequence metadata list accesses are threadsafe in task runner 
(#19467)
---
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   3 +-
 .../SeekableStreamIndexTaskRunner.java             | 356 +++++++-----
 .../SeekableStreamIndexTaskRunnerTest.java         | 609 +++++++++++++++------
 3 files changed, 687 insertions(+), 281 deletions(-)

diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 99fa37734ac..f4ccfd4fc70 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -113,7 +113,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -2086,7 +2085,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
 
     task.getRunner().setToolbox(toolboxFactory.build(task));
     task.getRunner().initializeSequences();
-    final CopyOnWriteArrayList<SequenceMetadata<String, String>> sequences = 
task.getRunner().getSequences();
+    final List<SequenceMetadata<String, String>> sequences = 
task.getRunner().getSequences();
 
     Assert.assertEquals(3, sequences.size());
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 38de3dc49fe..4bcf539d6ae 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -28,6 +28,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -37,6 +38,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.common.config.Configs;
 import org.apache.druid.data.input.Committer;
 import org.apache.druid.data.input.InputFormat;
@@ -122,7 +124,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
@@ -243,7 +244,9 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   protected volatile boolean pauseRequested = false;
   private volatile long nextCheckpointTime;
 
-  private volatile CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, 
SequenceOffsetType>> sequences;
+  private final Lock sequencesLock = new ReentrantLock();
+  @GuardedBy("sequencesLock")
+  private List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
sequences = new ArrayList<>();
   private volatile Throwable backgroundThreadException;
 
   private final Map<PartitionIdType, Long> partitionsThroughput = new 
HashMap<>();
@@ -265,7 +268,6 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     this.inputFormat = ioConfig.getInputFormat();
     this.stream = ioConfig.getStartSequenceNumbers().getStream();
     this.endOffsets = new 
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
-    this.sequences = new CopyOnWriteArrayList<>();
     this.ingestionState = IngestionState.NOT_STARTED;
     this.lockGranularityToUse = lockGranularityToUse;
 
@@ -388,7 +390,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       }
     }
 
-    log.info("Starting with sequences: %s", sequences);
+    log.info("Starting with sequences: %s", getSequencesSnapshot());
   }
 
   private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
@@ -496,7 +498,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       if (restoredMetadata == null) {
         // no persist has happened so far
         // so either this is a brand new task or replacement of a failed task
-        
Preconditions.checkState(sequences.get(0).startOffsets.entrySet().stream().allMatch(
+        final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
firstSequence = getFirstSequenceMetadata();
+        
Preconditions.checkState(firstSequence.startOffsets.entrySet().stream().allMatch(
             partitionOffsetEntry ->
                 
createSequenceNumber(partitionOffsetEntry.getValue()).compareTo(
                     createSequenceNumber(ioConfig.getStartSequenceNumbers()
@@ -504,7 +507,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                                                  
.get(partitionOffsetEntry.getKey())
                     )) >= 0
         ), "Sequence sequences are not compatible with start sequences of 
task");
-        currOffsets.putAll(sequences.get(0).startOffsets);
+        currOffsets.putAll(firstSequence.startOffsets);
       } else {
         @SuppressWarnings("unchecked")
         final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
@@ -534,16 +537,15 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         }
         // sequences size can be 0 only when all sequences got published and 
task stopped before it could finish
         // which is super rare
-        if (sequences.size() == 0 || 
getLastSequenceMetadata().isCheckpointed()) {
-          this.endOffsets.putAll(sequences.size() == 0
-                                 ? currOffsets
-                                 : getLastSequenceMetadata().getEndOffsets());
+        final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
latestSequence = getLastSequenceMetadataOrNull();
+        if (latestSequence == null || latestSequence.isCheckpointed()) {
+          this.endOffsets.putAll(latestSequence == null ? currOffsets : 
latestSequence.getEndOffsets());
         }
       }
 
       log.info(
           "Initialized sequences: %s",
-          
sequences.stream().map(SequenceMetadata::toString).collect(Collectors.joining(",
 "))
+          
getSequencesSnapshot().stream().map(SequenceMetadata::toString).collect(Collectors.joining(",
 "))
       );
 
       // Filter out partitions with END_OF_SHARD markers since these 
partitions have already been fully read. This
@@ -623,7 +625,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           }
 
           // if stop is requested or task's end sequence is set by call to 
setEndOffsets method with finish set to true
-          if (stopRequested.get() || sequences.size() == 0 || 
getLastSequenceMetadata().isCheckpointed()) {
+          final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
latestSequence = getLastSequenceMetadataOrNull();
+          if (stopRequested.get() || latestSequence == null || 
latestSequence.isCheckpointed()) {
             status = Status.PUBLISHING;
           }
 
@@ -666,18 +669,15 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
               final List<InputRow> rows = reader.parse(record.getData(), 
isEndOfShard(record.getSequenceNumber()));
               boolean isPersistRequired = false;
 
-              final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceToUse = sequences
-                  .stream()
-                  .filter(sequenceMetadata -> sequenceMetadata.canHandle(this, 
record))
-                  .findFirst()
-                  .orElse(null);
+              final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceToUse =
+                  findSequenceMetadataForRecord(record);
 
               if (sequenceToUse == null) {
                 throw new ISE(
                     "Cannot find any valid sequence for record with partition 
[%s] and sequenceNumber [%s]. Current sequences: %s",
                     record.getPartitionId(),
                     record.getSequenceNumber(),
-                    sequences
+                    getSequencesSnapshot()
                 );
               }
 
@@ -779,13 +779,13 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           }
 
           if (sequenceToCheckpoint != null && stillReading) {
+            final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
latestSequenceToCheckpoint =
+                getLastSequenceMetadata();
             Preconditions.checkArgument(
-                getLastSequenceMetadata()
-                    .getSequenceName()
-                    .equals(sequenceToCheckpoint.getSequenceName()),
+                
latestSequenceToCheckpoint.getSequenceName().equals(sequenceToCheckpoint.getSequenceName()),
                 "Cannot checkpoint a sequence [%s] which is not the latest 
one, sequences %s",
                 sequenceToCheckpoint,
-                sequences
+                getSequencesSnapshot()
             );
             requestPause();
             final CheckPointDataSourceMetadataAction checkpointAction = new 
CheckPointDataSourceMetadataAction(
@@ -833,7 +833,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       // We need to copy sequences here, because the success callback in 
publishAndRegisterHandoff removes items from
       // the sequence list. If a publish finishes before we finish iterating 
through the sequence list, we can
       // end up skipping some sequences.
-      List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
sequencesSnapshot = new ArrayList<>(sequences);
+      final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
sequencesSnapshot = getSequencesSnapshot();
       for (int i = 0; i < sequencesSnapshot.size(); i++) {
         final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata = sequencesSnapshot.get(i);
         if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
@@ -976,7 +976,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     final List<ListenableFuture<SegmentsAndCommitMetadata>> publishFinished = 
publishWaitList
         .stream()
         .filter(Future::isDone)
-        .collect(Collectors.toList());
+        .toList();
 
     for (ListenableFuture<SegmentsAndCommitMetadata> publishFuture : 
publishFinished) {
       // If publishFuture failed, the below line will throw an exception and 
catched by (1), and then (2) or (3).
@@ -989,7 +989,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     final List<ListenableFuture<SegmentsAndCommitMetadata>> handoffFinished = 
handOffWaitList
         .stream()
         .filter(Future::isDone)
-        .collect(Collectors.toList());
+        .toList();
 
     for (ListenableFuture<SegmentsAndCommitMetadata> handoffFuture : 
handoffFinished) {
       // If handoffFuture failed, the below line will throw an exception and 
catched by (1), and then (2) or (3).
@@ -1043,7 +1043,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
             log.infoSegments(publishedSegmentsAndCommitMetadata.getSegments(), 
"Published segments");
 
             publishedSequences.add(sequenceMetadata.getSequenceName());
-            sequences.remove(sequenceMetadata);
+            removeSequence(sequenceMetadata);
             publishingSequences.remove(sequenceMetadata.getSequenceName());
 
             try {
@@ -1110,12 +1110,18 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   {
     final File sequencesPersistFile = getSequencesPersistFile(toolbox);
     if (sequencesPersistFile.exists()) {
-      sequences = new CopyOnWriteArrayList<>(
+      final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
restoredSequences =
           toolbox.getJsonMapper().readValue(
               sequencesPersistFile,
               getSequenceMetadataTypeReference()
-          )
-      );
+          );
+      sequencesLock.lock();
+      try {
+        sequences = new ArrayList<>(restoredSequences);
+      }
+      finally {
+        sequencesLock.unlock();
+      }
       return true;
     } else {
       return false;
@@ -1124,11 +1130,12 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
   private synchronized void persistSequences() throws IOException
   {
+    final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
sequencesSnapshot = getSequencesSnapshot();
     toolbox.getJsonMapper().writerFor(
         getSequenceMetadataTypeReference()
-    ).writeValue(getSequencesPersistFile(toolbox), sequences);
+    ).writeValue(getSequencesPersistFile(toolbox), sequencesSnapshot);
 
-    log.info("Saved sequence metadata to disk: %s", sequences);
+    log.info("Saved sequence metadata to disk: %s", sequencesSnapshot);
   }
 
   /**
@@ -1194,7 +1201,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   private void maybePersistAndPublishSequences(Supplier<Committer> 
committerSupplier)
       throws InterruptedException
   {
-    for (SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata : sequences) {
+    for (SequenceMetadata<PartitionIdType, SequenceOffsetType> 
sequenceMetadata : getSequencesSnapshot()) {
       sequenceMetadata.updateAssignments(currOffsets, 
this::isMoreToReadBeforeReadingRecord);
       if (!sequenceMetadata.isOpen()
           && !publishingSequences.contains(sequenceMetadata.getSequenceName())
@@ -1248,44 +1255,124 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
   private void addSequence(final SequenceMetadata<PartitionIdType, 
SequenceOffsetType> sequenceMetadata)
   {
-    // Sanity check that the start of the new sequence matches up with the end 
of the prior sequence.
-    for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
sequenceMetadata.getStartOffsets().entrySet()) {
-      final PartitionIdType partition = entry.getKey();
-      final SequenceOffsetType startOffset = entry.getValue();
+    sequencesLock.lock();
+    try {
+      final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
latestSequence =
+          getLastSequenceMetadataOrNull(sequences);
+
+      // Sanity check that the start of the new sequence matches up with the 
end of the prior sequence.
+      for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
sequenceMetadata.getStartOffsets().entrySet()) {
+        final PartitionIdType partition = entry.getKey();
+        final SequenceOffsetType startOffset = entry.getValue();
 
-      if (!sequences.isEmpty()) {
-        final SequenceOffsetType priorOffset = 
getLastSequenceMetadata().endOffsets.get(partition);
+        if (latestSequence != null) {
+          final SequenceOffsetType priorOffset = 
latestSequence.getEndOffsets().get(partition);
+
+          if (!startOffset.equals(priorOffset)) {
+            throw new ISE(
+                "New sequence startOffset[%s] does not equal expected prior 
offset[%s]",
+                startOffset,
+                priorOffset
+            );
+          }
+        }
+      }
 
-        if (!startOffset.equals(priorOffset)) {
+      if (!isEndOffsetExclusive() && latestSequence != null) {
+        if 
(!latestSequence.getEndOffsets().keySet().equals(sequenceMetadata.getExclusiveStartPartitions()))
 {
           throw new ISE(
-              "New sequence startOffset[%s] does not equal expected prior 
offset[%s]",
-              startOffset,
-              priorOffset
+              "Exclusive start partitions[%s] for new sequence don't match to 
the prior offset[%s]",
+              sequenceMetadata.getExclusiveStartPartitions(),
+              latestSequence
           );
         }
       }
-    }
 
-    if (!isEndOffsetExclusive() && !sequences.isEmpty()) {
-      final SequenceMetadata<PartitionIdType, SequenceOffsetType> lastMetadata 
= getLastSequenceMetadata();
-      if 
(!lastMetadata.endOffsets.keySet().equals(sequenceMetadata.getExclusiveStartPartitions()))
 {
-        throw new ISE(
-            "Exclusive start partitions[%s] for new sequence don't match to 
the prior offset[%s]",
-            sequenceMetadata.getExclusiveStartPartitions(),
-            lastMetadata
-        );
-      }
+      // Actually do the add.
+      sequences.add(sequenceMetadata);
+    }
+    finally {
+      sequencesLock.unlock();
     }
+  }
 
-    // Actually do the add.
-    sequences.add(sequenceMetadata);
+  private void removeSequence(final SequenceMetadata<PartitionIdType, 
SequenceOffsetType> sequenceMetadata)
+  {
+    sequencesLock.lock();
+    try {
+      sequences.remove(sequenceMetadata);
+    }
+    finally {
+      sequencesLock.unlock();
+    }
   }
 
   @VisibleForTesting
   public SequenceMetadata<PartitionIdType, SequenceOffsetType> 
getLastSequenceMetadata()
   {
-    Preconditions.checkState(!sequences.isEmpty(), "Empty sequences");
-    return sequences.get(sequences.size() - 1);
+    return getLastSequenceMetadata(getSequencesSnapshot());
+  }
+
+  private SequenceMetadata<PartitionIdType, SequenceOffsetType> 
getFirstSequenceMetadata()
+  {
+    final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
sequenceMetadataList = getSequencesSnapshot();
+    Preconditions.checkState(!sequenceMetadataList.isEmpty(), "Empty 
sequences");
+    return sequenceMetadataList.get(0);
+  }
+
+  @Nullable
+  private SequenceMetadata<PartitionIdType, SequenceOffsetType> 
getLastSequenceMetadataOrNull()
+  {
+    return getLastSequenceMetadataOrNull(getSequencesSnapshot());
+  }
+
+  @Nullable
+  private SequenceMetadata<PartitionIdType, SequenceOffsetType> 
getLastSequenceMetadataOrNull(
+      final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
sequenceMetadataList
+  )
+  {
+    if (sequenceMetadataList.isEmpty()) {
+      return null;
+    }
+    return sequenceMetadataList.get(sequenceMetadataList.size() - 1);
+  }
+
+  private SequenceMetadata<PartitionIdType, SequenceOffsetType> 
getLastSequenceMetadata(
+      final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
sequenceMetadataList
+  )
+  {
+    final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence 
=
+        getLastSequenceMetadataOrNull(sequenceMetadataList);
+    Preconditions.checkState(latestSequence != null, "Empty sequences");
+    return latestSequence;
+  }
+
+  private List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
getSequencesSnapshot()
+  {
+    sequencesLock.lock();
+    try {
+      return ImmutableList.copyOf(sequences);
+    }
+    finally {
+      sequencesLock.unlock();
+    }
+  }
+
+  @Nullable
+  private SequenceMetadata<PartitionIdType, SequenceOffsetType> 
findSequenceMetadataForRecord(
+      final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, 
RecordType> record
+  )
+  {
+    sequencesLock.lock();
+    try {
+      return sequences.stream()
+                      .filter(sequenceMetadata -> 
sequenceMetadata.canHandle(this, record))
+                      .findFirst()
+                      .orElse(null);
+    }
+    finally {
+      sequencesLock.unlock();
+    }
   }
 
   /**
@@ -1727,9 +1814,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         // and after acquiring pauseLock to correctly guard against duplicate 
requests
         Preconditions.checkState(sequenceNumbers.size() > 0, "No sequences 
found to set end sequences");
 
-        final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
latestSequence = getLastSequenceMetadata();
         final Set<PartitionIdType> exclusiveStartPartitions;
-
         if (isEndOffsetExclusive()) {
           // When end offsets are exclusive, there's no need for marking the 
next sequence as having any
           // exclusive-start partitions. It should always start from the end 
offsets of the prior sequence.
@@ -1740,75 +1825,98 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           exclusiveStartPartitions = sequenceNumbers.keySet();
         }
 
-        if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
-             && 
latestSequence.getExclusiveStartPartitions().equals(exclusiveStartPartitions)
-             && !finish)
-            || (latestSequence.getEndOffsets().equals(sequenceNumbers) && 
finish)) {
-          log.warn("Ignoring duplicate request, end offsets already set for 
sequences [%s]", sequenceNumbers);
-          resetNextCheckpointTime();
-          resume();
-          return Response.ok(sequenceNumbers).build();
-        } else if (latestSequence.isCheckpointed()) {
-          return Response.status(Response.Status.BAD_REQUEST)
-                         .type(MediaType.TEXT_PLAIN)
-                         .entity(StringUtils.format(
-                             "Sequence [%s] has already endOffsets set, cannot 
set to [%s]",
-                             latestSequence,
-                             sequenceNumbers
-                         )).build();
-        } else if (!isPaused()) {
-          return Response.status(Response.Status.BAD_REQUEST)
-                         .entity("Task must be paused before changing the end 
offsets")
-                         .build();
-        }
+        Response earlyResponse = null;
+        boolean resumeBeforeReturning = false;
 
-        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
sequenceNumbers.entrySet()) {
-          if 
(createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey())))
-              < 0) {
-            return Response.status(Response.Status.BAD_REQUEST)
-                           .entity(
-                               StringUtils.format(
-                                   "End sequence must be >= current sequence 
for partition [%s] (current: %s)",
-                                   entry.getKey(),
-                                   currOffsets.get(entry.getKey())
-                               )
-                           )
-                           .build();
-          }
-        }
+        sequencesLock.lock();
+        try {
+          final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
latestSequence =
+              getLastSequenceMetadata(sequences);
+
+          if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
+               && 
latestSequence.getExclusiveStartPartitions().equals(exclusiveStartPartitions)
+               && !finish)
+              || (latestSequence.getEndOffsets().equals(sequenceNumbers) && 
finish)) {
+            log.warn("Ignoring duplicate request, end offsets already set for 
sequences [%s]", sequenceNumbers);
+            resetNextCheckpointTime();
+            resumeBeforeReturning = true;
+            earlyResponse = Response.ok(sequenceNumbers).build();
+          } else if (latestSequence.isCheckpointed()) {
+            earlyResponse = Response.status(Response.Status.BAD_REQUEST)
+                                    .type(MediaType.TEXT_PLAIN)
+                                    .entity(StringUtils.format(
+                                        "Sequence [%s] has already endOffsets 
set, cannot set to [%s]",
+                                        latestSequence,
+                                        sequenceNumbers
+                                    )).build();
+          } else if (!isPaused()) {
+            earlyResponse = Response.status(Response.Status.BAD_REQUEST)
+                                    .entity("Task must be paused before 
changing the end offsets")
+                                    .build();
+          } else {
+            for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : 
sequenceNumbers.entrySet()) {
+              if 
(createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey())))
+                  < 0) {
+                earlyResponse = Response.status(Response.Status.BAD_REQUEST)
+                                        .entity(
+                                            StringUtils.format(
+                                                "End sequence must be >= 
current sequence for partition [%s] (current: %s)",
+                                                entry.getKey(),
+                                                currOffsets.get(entry.getKey())
+                                            )
+                                        )
+                                        .build();
+                break;
+              }
+            }
 
-        resetNextCheckpointTime();
-        latestSequence.setEndOffsets(sequenceNumbers);
+            if (earlyResponse == null) {
+              resetNextCheckpointTime();
+              latestSequence.setEndOffsets(sequenceNumbers);
 
-        if (finish) {
-          log.info(
-              "Sequence[%s] end offsets updated from [%s] to [%s].",
-              latestSequence.getSequenceName(),
-              endOffsets,
-              sequenceNumbers
-          );
-          endOffsets.putAll(sequenceNumbers);
-        } else {
-          // create new sequence
-          final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
newSequence = new SequenceMetadata<>(
-              latestSequence.getSequenceId() + 1,
-              StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), 
latestSequence.getSequenceId() + 1),
-              sequenceNumbers,
-              endOffsets,
-              false,
-              exclusiveStartPartitions,
-              getTaskLockType()
-          );
+              if (finish) {
+                log.info(
+                    "Sequence[%s] end offsets updated from [%s] to [%s].",
+                    latestSequence.getSequenceName(),
+                    endOffsets,
+                    sequenceNumbers
+                );
+                endOffsets.putAll(sequenceNumbers);
+              } else {
+                // create new sequence
+                final SequenceMetadata<PartitionIdType, SequenceOffsetType> 
newSequence = new SequenceMetadata<>(
+                    latestSequence.getSequenceId() + 1,
+                    StringUtils.format("%s_%d", 
ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1),
+                    sequenceNumbers,
+                    endOffsets,
+                    false,
+                    exclusiveStartPartitions,
+                    getTaskLockType()
+                );
 
-          log.info(
-              "Sequence[%s] created with start offsets [%s] and end offsets 
[%s].",
-              newSequence.getSequenceName(),
-              sequenceNumbers,
-              endOffsets
-          );
+                log.info(
+                    "Sequence[%s] created with start offsets [%s] and end 
offsets [%s].",
+                    newSequence.getSequenceName(),
+                    sequenceNumbers,
+                    endOffsets
+                );
+
+                addSequence(newSequence);
+              }
+            }
+          }
+        }
+        finally {
+          sequencesLock.unlock();
+        }
 
-          addSequence(newSequence);
+        if (earlyResponse != null) {
+          if (resumeBeforeReturning) {
+            resume();
+          }
+          return earlyResponse;
         }
+
         persistSequences();
       }
       catch (Exception e) {
@@ -1836,9 +1944,9 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   }
 
   @VisibleForTesting
-  public CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, 
SequenceOffsetType>> getSequences()
+  public List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> 
getSequences()
   {
-    return sequences;
+    return getSequencesSnapshot();
   }
 
   @GET
@@ -1854,7 +1962,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
   private Map<Integer, Map<PartitionIdType, SequenceOffsetType>> 
getCheckpoints()
   {
-    return new TreeMap<>(sequences.stream()
+    return new TreeMap<>(getSequencesSnapshot().stream()
                                   .collect(Collectors.toMap(
                                       SequenceMetadata::getSequenceId,
                                       SequenceMetadata::getStartOffsets
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index f08ee84d89d..7f0e731d1d3 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.seekablestream;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Futures;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.data.input.InputRow;
@@ -60,6 +61,7 @@ import org.apache.druid.server.coordinator.CreateDataSegments;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
+import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -71,19 +73,31 @@ import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.annotation.Nullable;
+import javax.ws.rs.core.Response;
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.mockito.ArgumentMatchers.any;
 
 @RunWith(MockitoJUnitRunner.class)
 public class SeekableStreamIndexTaskRunnerTest
 {
+  private static final String DATA_SOURCE = "datasource";
+
   @Rule
   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@@ -102,48 +116,161 @@ public class SeekableStreamIndexTaskRunnerTest
   }
 
   @Test
-  public void testWithinMinMaxTime()
+  public void 
testGetLastSequenceMetadataUsesStableSnapshotIfSequenceCompletesConcurrently() 
throws Exception
+  {
+    final TestSeekableStreamIndexTaskRunner runner = createRunner();
+    final SequenceMetadata<String, String> firstSequence = new 
SequenceMetadata<>(
+        0,
+        "test_0",
+        ImmutableMap.of("partition", "0"),
+        ImmutableMap.of("partition", "5"),
+        true,
+        ImmutableSet.of(),
+        null
+    );
+    final SequenceMetadata<String, String> secondSequence = new 
SequenceMetadata<>(
+        1,
+        "test_1",
+        ImmutableMap.of("partition", "5"),
+        ImmutableMap.of("partition", "10"),
+        false,
+        ImmutableSet.of(),
+        null
+    );
+    final ShrinkingCopyOnWriteArrayList<SequenceMetadata<String, String>> 
sequences =
+        new ShrinkingCopyOnWriteArrayList<>();
+    sequences.add(firstSequence);
+    sequences.add(secondSequence);
+    setSequences(runner, sequences);
+
+    sequences.removeFirstElementDuringNextSnapshotOrSize();
+
+    Assert.assertSame(secondSequence, runner.getLastSequenceMetadata());
+    Assert.assertEquals(1, sequences.size());
+    Assert.assertSame(secondSequence, sequences.get(0));
+  }
+
+  @Test
+  public void testSetEndOffsetsReturnsBadRequestWhenTaskIsNotPaused() throws 
Exception
+  {
+    final TestSeekableStreamIndexTaskRunner runner = createInitializedRunner(
+        ImmutableMap.of("partition", "0"),
+        ImmutableMap.of("partition", "10")
+    );
+    setStatus(runner, SeekableStreamIndexTaskRunner.Status.READING);
+
+    final Response response = 
runner.setEndOffsets(ImmutableMap.of("partition", "5"), false);
+
+    Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    Assert.assertEquals("Task must be paused before changing the end offsets", 
response.getEntity());
+  }
+
+  @Test
+  public void 
testSetEndOffsetsReturnsBadRequestWhenLatestSequenceIsCheckpointed() throws 
Exception
   {
-    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+    final TestSeekableStreamIndexTaskRunner runner = createInitializedRunner(
+        ImmutableMap.of("partition", "0"),
+        ImmutableMap.of("partition", "10")
+    );
+    setSequences(
+        runner,
         Arrays.asList(
-            new StringDimensionSchema("d1"),
-            new StringDimensionSchema("d2")
+            new SequenceMetadata<>(
+                0,
+                "test_0",
+                ImmutableMap.of("partition", "0"),
+                ImmutableMap.of("partition", "5"),
+                true,
+                ImmutableSet.of(),
+                null
+            )
         )
     );
-    DataSchema schema =
-        DataSchema.builder()
-                  .withDataSource("datasource")
-                  .withTimestamp(TimestampSpec.DEFAULT)
-                  .withDimensions(dimensionsSpec)
-                  .withGranularity(
-                      new UniformGranularitySpec(Granularities.MINUTE, 
Granularities.NONE, null)
-                  )
-                  .build();
-
-    SeekableStreamIndexTaskTuningConfig tuningConfig = 
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
-    SeekableStreamIndexTaskIOConfig<String, String> ioConfig = 
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
-    SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers = 
Mockito.mock(SeekableStreamStartSequenceNumbers.class);
-    SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers = 
Mockito.mock(SeekableStreamEndSequenceNumbers.class);
-
-    DateTime now = DateTimes.nowUtc();
-
-    
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(120L);
-    
Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(DateTimes.nowUtc().plusHours(2));
-    
Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(DateTimes.nowUtc().minusHours(2));
-    Mockito.when(ioConfig.getInputFormat()).thenReturn(new 
JsonInputFormat(null, null, null, null, null));
-    
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
-    
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
 
-    
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
-    Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
+    final Response response = 
runner.setEndOffsets(ImmutableMap.of("partition", "6"), false);
 
-    Mockito.when(task.getDataSchema()).thenReturn(schema);
-    Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
-    Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
+    Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+    Assert.assertTrue(response.getEntity().toString().contains("has already 
endOffsets set"));
+  }
 
-    TestSeekableStreamIndexTaskRunner runner = new 
TestSeekableStreamIndexTaskRunner(
-        task,
-        LockGranularity.TIME_CHUNK
+  @Test
+  public void 
testSetEndOffsetsReturnsBadRequestWhenEndOffsetPrecedesCurrentOffset() throws 
Exception
+  {
+    final TestSeekableStreamIndexTaskRunner runner = createInitializedRunner(
+        ImmutableMap.of("partition", "0"),
+        ImmutableMap.of("partition", "10")
+    );
+    setCurrentOffsets(runner, ImmutableMap.of("partition", "5"));
+
+    try (final PausedRunner ignored = pauseRunner(runner)) {
+      final Response response = 
runner.setEndOffsets(ImmutableMap.of("partition", "4"), false);
+
+      Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
response.getStatus());
+      Assert.assertEquals(
+          "End sequence must be >= current sequence for partition [partition] 
(current: 5)",
+          response.getEntity()
+      );
+      Assert.assertFalse(runner.getLastSequenceMetadata().isCheckpointed());
+      Assert.assertEquals(1, runner.getSequences().size());
+    }
+  }
+
+  @Test
+  public void testSetEndOffsetsCreatesNewSequenceWhenPaused() throws Exception
+  {
+    final TestSeekableStreamIndexTaskRunner runner = createInitializedRunner(
+        ImmutableMap.of("partition", "0"),
+        ImmutableMap.of("partition", "10")
+    );
+    setCurrentOffsets(runner, ImmutableMap.of("partition", "4"));
+
+    try (final PausedRunner pausedRunner = pauseRunner(runner)) {
+      final Response response = 
runner.setEndOffsets(ImmutableMap.of("partition", "5"), false);
+
+      Assert.assertEquals(Response.Status.OK.getStatusCode(), 
response.getStatus());
+      pausedRunner.awaitResumed();
+    }
+
+    final List<SequenceMetadata<String, String>> sequences = 
runner.getSequences();
+    Assert.assertEquals(2, sequences.size());
+    Assert.assertTrue(sequences.get(0).isCheckpointed());
+    Assert.assertEquals(ImmutableMap.of("partition", "5"), 
sequences.get(0).getEndOffsets());
+    Assert.assertEquals("test_1", sequences.get(1).getSequenceName());
+    Assert.assertEquals(ImmutableMap.of("partition", "5"), 
sequences.get(1).getStartOffsets());
+    Assert.assertEquals(ImmutableMap.of("partition", "10"), 
sequences.get(1).getEndOffsets());
+    Assert.assertEquals(ImmutableSet.of("partition"), 
sequences.get(1).getExclusiveStartPartitions());
+  }
+
+  @Test
+  public void testSetEndOffsetsFinishUpdatesLatestSequenceWhenPaused() throws 
Exception
+  {
+    final TestSeekableStreamIndexTaskRunner runner = createInitializedRunner(
+        ImmutableMap.of("partition", "0"),
+        ImmutableMap.of("partition", "10")
+    );
+    setCurrentOffsets(runner, ImmutableMap.of("partition", "4"));
+
+    try (final PausedRunner pausedRunner = pauseRunner(runner)) {
+      final Response response = 
runner.setEndOffsets(ImmutableMap.of("partition", "6"), true);
+
+      Assert.assertEquals(Response.Status.OK.getStatusCode(), 
response.getStatus());
+      pausedRunner.awaitResumed();
+    }
+
+    final List<SequenceMetadata<String, String>> sequences = 
runner.getSequences();
+    Assert.assertEquals(1, sequences.size());
+    Assert.assertTrue(sequences.get(0).isCheckpointed());
+    Assert.assertEquals(ImmutableMap.of("partition", "6"), 
sequences.get(0).getEndOffsets());
+  }
+
+  @Test
+  public void testWithinMinMaxTime()
+  {
+    final DateTime now = DateTimes.nowUtc();
+    final TestSeekableStreamIndexTaskRunner runner = 
createRunnerWithMessageTimeBounds(
+        120L,
+        now.minusHours(2),
+        now.plusHours(2)
     );
 
     Mockito.when(row.getTimestamp()).thenReturn(now);
@@ -159,47 +286,8 @@ public class SeekableStreamIndexTaskRunnerTest
   @Test
   public void testWithinMinMaxTimeNotPopulated()
   {
-    DimensionsSpec dimensionsSpec = new DimensionsSpec(
-        Arrays.asList(
-            new StringDimensionSchema("d1"),
-            new StringDimensionSchema("d2")
-        )
-    );
-    DataSchema schema =
-        DataSchema.builder()
-                  .withDataSource("datasource")
-                  .withTimestamp(TimestampSpec.DEFAULT)
-                  .withDimensions(dimensionsSpec)
-                  .withGranularity(
-                      new UniformGranularitySpec(Granularities.MINUTE, 
Granularities.NONE, null)
-                  )
-                  .build();
-
-    SeekableStreamIndexTaskTuningConfig tuningConfig = 
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
-    SeekableStreamIndexTaskIOConfig<String, String> ioConfig = 
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
-    SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers = 
Mockito.mock(SeekableStreamStartSequenceNumbers.class);
-    SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers = 
Mockito.mock(SeekableStreamEndSequenceNumbers.class);
-
-    DateTime now = DateTimes.nowUtc();
-
-    
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
-    // min max time not populated.
-    Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(null);
-    Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(null);
-    Mockito.when(ioConfig.getInputFormat()).thenReturn(new 
JsonInputFormat(null, null, null, null, null));
-    
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
-    
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
-
-    
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
-    Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
-
-    Mockito.when(task.getDataSchema()).thenReturn(schema);
-    Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
-    Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
-    TestSeekableStreamIndexTaskRunner runner = new 
TestSeekableStreamIndexTaskRunner(
-        task,
-        LockGranularity.TIME_CHUNK
-    );
+    final DateTime now = DateTimes.nowUtc();
+    final TestSeekableStreamIndexTaskRunner runner = createRunner();
 
     Mockito.when(row.getTimestamp()).thenReturn(now);
     Assert.assertEquals(InputRowFilterResult.ACCEPTED, 
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
@@ -214,45 +302,7 @@ public class SeekableStreamIndexTaskRunnerTest
   @Test
   public void testEnsureRowRejectionReasonForNullRow()
   {
-    DimensionsSpec dimensionsSpec = new DimensionsSpec(
-        Arrays.asList(
-            new StringDimensionSchema("d1"),
-            new StringDimensionSchema("d2")
-        )
-    );
-    DataSchema schema =
-        DataSchema.builder()
-                  .withDataSource("datasource")
-                  .withTimestamp(TimestampSpec.DEFAULT)
-                  .withDimensions(dimensionsSpec)
-                  .withGranularity(
-                      new UniformGranularitySpec(Granularities.MINUTE, 
Granularities.NONE, null)
-                  )
-                  .build();
-
-    SeekableStreamIndexTaskTuningConfig tuningConfig = 
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
-    SeekableStreamIndexTaskIOConfig<String, String> ioConfig = 
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
-    SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers = 
Mockito.mock(SeekableStreamStartSequenceNumbers.class);
-    SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers = 
Mockito.mock(SeekableStreamEndSequenceNumbers.class);
-
-    
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
-    Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(null);
-    Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(null);
-    Mockito.when(ioConfig.getInputFormat()).thenReturn(new 
JsonInputFormat(null, null, null, null, null));
-    
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
-    
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
-
-    
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
-    Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
-
-    Mockito.when(task.getDataSchema()).thenReturn(schema);
-    Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
-    Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
-
-    TestSeekableStreamIndexTaskRunner runner = new 
TestSeekableStreamIndexTaskRunner(
-        task,
-        LockGranularity.TIME_CHUNK
-    );
+    final TestSeekableStreamIndexTaskRunner runner = createRunner();
 
     Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(null));
   }
@@ -260,44 +310,9 @@ public class SeekableStreamIndexTaskRunnerTest
   @Test
   public void test_run_emitsRowCountAndSegmentCount_onSuccessfulPublish()
   {
-    DimensionsSpec dimensionsSpec = new DimensionsSpec(
-        Arrays.asList(
-            new StringDimensionSchema("d1"),
-            new StringDimensionSchema("d2")
-        )
-    );
-    DataSchema schema =
-        DataSchema.builder()
-                  .withDataSource("datasource")
-                  .withTimestamp(TimestampSpec.DEFAULT)
-                  .withDimensions(dimensionsSpec)
-                  .withGranularity(
-                      new UniformGranularitySpec(Granularities.MINUTE, 
Granularities.NONE, null)
-                  )
-                  .build();
-
-    SeekableStreamIndexTaskTuningConfig tuningConfig = 
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
-    SeekableStreamIndexTaskIOConfig<String, String> ioConfig = 
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
-    SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers = 
Mockito.mock(SeekableStreamStartSequenceNumbers.class);
-    SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers = 
Mockito.mock(SeekableStreamEndSequenceNumbers.class);
-
-    
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
-    Mockito.when(ioConfig.getInputFormat()).thenReturn(new 
JsonInputFormat(null, null, null, null, null));
-    
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
-    
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
-
-    
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
-    Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
-
-    Mockito.when(task.getDataSchema()).thenReturn(schema);
-    Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
-    Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
+    final TestSeekableStreamIndexTaskRunner runner = createRunner();
     Mockito.when(task.getId()).thenReturn("task1");
     Mockito.when(task.getSupervisorId()).thenReturn("supervisorId");
-    TestSeekableStreamIndexTaskRunner runner = new 
TestSeekableStreamIndexTaskRunner(
-        task,
-        LockGranularity.TIME_CHUNK
-    );
     Assert.assertEquals("supervisorId", runner.getSupervisorId());
 
     // Setup the task to return a RecordSupplier, StreamAppenderatorDriver, 
Appenderator
@@ -310,7 +325,7 @@ public class SeekableStreamIndexTaskRunnerTest
            .thenReturn(appenderator);
 
     final List<DataSegment> segment = CreateDataSegments
-        .ofDatasource(schema.getDataSource())
+        .ofDatasource(DATA_SOURCE)
         .withNumPartitions(10)
         .withNumRows(1_000)
         .eachOfSizeInMb(500);
@@ -372,6 +387,275 @@ public class SeekableStreamIndexTaskRunnerTest
     }
   }
 
+  private TestSeekableStreamIndexTaskRunner createRunner()
+  {
+    return createRunner(ImmutableMap.of(), ImmutableMap.of());
+  }
+
+  private TestSeekableStreamIndexTaskRunner createRunner(
+      Map<String, String> startOffsets,
+      Map<String, String> endOffsets
+  )
+  {
+    return createRunner(createDataSchema(), null, null, null, startOffsets, 
endOffsets);
+  }
+
+  private TestSeekableStreamIndexTaskRunner createRunnerWithMessageTimeBounds(
+      Long refreshRejectionPeriodsInMinutes,
+      DateTime minMessageTime,
+      DateTime maxMessageTime
+  )
+  {
+    return createRunner(
+        createDataSchema(),
+        refreshRejectionPeriodsInMinutes,
+        minMessageTime,
+        maxMessageTime,
+        ImmutableMap.of(),
+        ImmutableMap.of()
+    );
+  }
+
+  private TestSeekableStreamIndexTaskRunner createRunner(
+      DataSchema schema,
+      @Nullable Long refreshRejectionPeriodsInMinutes,
+      @Nullable DateTime minMessageTime,
+      @Nullable DateTime maxMessageTime,
+      Map<String, String> startOffsets,
+      Map<String, String> endOffsets
+  )
+  {
+    final SeekableStreamIndexTaskTuningConfig tuningConfig = 
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
+    final SeekableStreamIndexTaskIOConfig<String, String> ioConfig = 
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
+    final SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers = 
new SeekableStreamStartSequenceNumbers<>(
+        "test",
+        startOffsets,
+        ImmutableSet.of()
+    );
+    final SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers 
= new SeekableStreamEndSequenceNumbers<>(
+        "test",
+        endOffsets
+    );
+
+    
Mockito.when(tuningConfig.getIntermediateHandoffPeriod()).thenReturn(Period.minutes(1));
+    
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(refreshRejectionPeriodsInMinutes);
+    Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(maxMessageTime);
+    Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(minMessageTime);
+    Mockito.when(ioConfig.getInputFormat()).thenReturn(new 
JsonInputFormat(null, null, null, null, null));
+    
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
+    
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
+    Mockito.when(ioConfig.getBaseSequenceName()).thenReturn("test");
+
+    Mockito.when(task.getDataSchema()).thenReturn(schema);
+    Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
+    Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
+    Mockito.when(task.getContext()).thenReturn(ImmutableMap.of());
+
+    return new TestSeekableStreamIndexTaskRunner(
+        task,
+        LockGranularity.TIME_CHUNK
+    );
+  }
+
+  private static DataSchema createDataSchema()
+  {
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("d1"),
+            new StringDimensionSchema("d2")
+        )
+    );
+    return DataSchema.builder()
+                     .withDataSource(DATA_SOURCE)
+                     .withTimestamp(TimestampSpec.DEFAULT)
+                     .withDimensions(dimensionsSpec)
+                     .withGranularity(
+                         new UniformGranularitySpec(Granularities.MINUTE, 
Granularities.NONE, null)
+                     )
+                     .build();
+  }
+
+  private TestSeekableStreamIndexTaskRunner createInitializedRunner(
+      Map<String, String> startOffsets,
+      Map<String, String> endOffsets
+  ) throws Exception
+  {
+    final TestSeekableStreamIndexTaskRunner runner = 
createRunner(startOffsets, endOffsets);
+    runner.setToolbox(createTaskToolbox());
+    runner.initializeSequences();
+    setCurrentOffsets(runner, startOffsets);
+    return runner;
+  }
+
+  private static void setSequences(
+      SeekableStreamIndexTaskRunner runner,
+      List<? extends SequenceMetadata> sequences
+  ) throws NoSuchFieldException, IllegalAccessException
+  {
+    final Field sequencesField = 
SeekableStreamIndexTaskRunner.class.getDeclaredField("sequences");
+    sequencesField.setAccessible(true);
+    sequencesField.set(runner, sequences);
+  }
+
+  private static void setCurrentOffsets(
+      SeekableStreamIndexTaskRunner runner,
+      Map<?, ?> currentOffsets
+  ) throws NoSuchFieldException, IllegalAccessException
+  {
+    final Field currOffsetsField = 
SeekableStreamIndexTaskRunner.class.getDeclaredField("currOffsets");
+    currOffsetsField.setAccessible(true);
+    final Map currOffsets = (Map) currOffsetsField.get(runner);
+    currOffsets.clear();
+    currOffsets.putAll(currentOffsets);
+  }
+
+  private static void setStatus(
+      SeekableStreamIndexTaskRunner runner,
+      SeekableStreamIndexTaskRunner.Status status
+  ) throws NoSuchFieldException, IllegalAccessException
+  {
+    final Field statusField = 
SeekableStreamIndexTaskRunner.class.getDeclaredField("status");
+    statusField.setAccessible(true);
+    statusField.set(runner, status);
+  }
+
+  private static void setPauseRequested(
+      SeekableStreamIndexTaskRunner runner,
+      boolean pauseRequested
+  ) throws NoSuchFieldException, IllegalAccessException
+  {
+    final Field pauseRequestedField = 
SeekableStreamIndexTaskRunner.class.getDeclaredField("pauseRequested");
+    pauseRequestedField.setAccessible(true);
+    pauseRequestedField.set(runner, pauseRequested);
+  }
+
+  private static SeekableStreamIndexTaskRunner.Status getStatus(
+      SeekableStreamIndexTaskRunner runner
+  ) throws NoSuchFieldException, IllegalAccessException
+  {
+    final Field statusField = 
SeekableStreamIndexTaskRunner.class.getDeclaredField("status");
+    statusField.setAccessible(true);
+    return (SeekableStreamIndexTaskRunner.Status) statusField.get(runner);
+  }
+
+  private static PausedRunner pauseRunner(TestSeekableStreamIndexTaskRunner 
runner) throws Exception
+  {
+    setStatus(runner, SeekableStreamIndexTaskRunner.Status.READING);
+    setPauseRequested(runner, true);
+
+    final ExecutorService executor = Executors.newSingleThreadExecutor();
+    final Future<Boolean> possiblyPauseFuture = executor.submit(() -> 
invokePossiblyPause(runner));
+    waitForStatus(runner, SeekableStreamIndexTaskRunner.Status.PAUSED);
+    return new PausedRunner(runner, executor, possiblyPauseFuture);
+  }
+
+  private static void waitForStatus(
+      SeekableStreamIndexTaskRunner runner,
+      SeekableStreamIndexTaskRunner.Status status
+  ) throws Exception
+  {
+    final long deadline = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(2);
+    while (System.currentTimeMillis() < deadline) {
+      if (getStatus(runner) == status) {
+        return;
+      }
+      Thread.sleep(10);
+    }
+    Assert.fail("Timed out waiting for status [" + status + "]");
+  }
+
+  private static boolean invokePossiblyPause(SeekableStreamIndexTaskRunner 
runner) throws Exception
+  {
+    final Method possiblyPauseMethod = 
SeekableStreamIndexTaskRunner.class.getDeclaredMethod("possiblyPause");
+    possiblyPauseMethod.setAccessible(true);
+    try {
+      return (boolean) possiblyPauseMethod.invoke(runner);
+    }
+    catch (InvocationTargetException e) {
+      final Throwable cause = e.getCause();
+      if (cause instanceof Exception) {
+        throw (Exception) cause;
+      } else if (cause instanceof Error) {
+        throw (Error) cause;
+      } else {
+        throw new RuntimeException(cause);
+      }
+    }
+  }
+
+  private static class PausedRunner implements AutoCloseable
+  {
+    private final TestSeekableStreamIndexTaskRunner runner;
+    private final ExecutorService executor;
+    private final Future<Boolean> possiblyPauseFuture;
+
+    private PausedRunner(
+        TestSeekableStreamIndexTaskRunner runner,
+        ExecutorService executor,
+        Future<Boolean> possiblyPauseFuture
+    )
+    {
+      this.runner = runner;
+      this.executor = executor;
+      this.possiblyPauseFuture = possiblyPauseFuture;
+    }
+
+    void awaitResumed() throws Exception
+    {
+      Assert.assertTrue(possiblyPauseFuture.get(2, TimeUnit.SECONDS));
+    }
+
+    @Override
+    public void close() throws Exception
+    {
+      try {
+        if (!possiblyPauseFuture.isDone()) {
+          runner.resume();
+          awaitResumed();
+        }
+      }
+      finally {
+        executor.shutdownNow();
+      }
+    }
+  }
+
+  private static class ShrinkingCopyOnWriteArrayList<E> extends 
CopyOnWriteArrayList<E>
+  {
+    private final AtomicBoolean removeFirstElement = new AtomicBoolean(false);
+
+    void removeFirstElementDuringNextSnapshotOrSize()
+    {
+      removeFirstElement.set(true);
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+      return super.size() == 0;
+    }
+
+    @Override
+    public int size()
+    {
+      final int size = super.size();
+      if (removeFirstElement.compareAndSet(true, false)) {
+        remove(0);
+      }
+      return size;
+    }
+
+    @Override
+    public Object[] toArray()
+    {
+      final Object[] snapshot = super.toArray();
+      if (removeFirstElement.compareAndSet(true, false)) {
+        remove(0);
+      }
+      return snapshot;
+    }
+  }
+
   private static class NoopDruidNodeAnnouncer implements DruidNodeAnnouncer
   {
 
@@ -414,7 +698,10 @@ public class SeekableStreamIndexTaskRunnerTest
     @Override
     protected Object getNextStartOffset(Object sequenceNumber)
     {
-      return null;
+      if (sequenceNumber == null) {
+        return null;
+      }
+      return String.valueOf(Long.parseLong(sequenceNumber.toString()) + 1);
     }
 
     @Override
@@ -438,7 +725,17 @@ public class SeekableStreamIndexTaskRunnerTest
     @Override
     protected OrderedSequenceNumber createSequenceNumber(Object sequenceNumber)
     {
-      return null;
+      if (sequenceNumber == null) {
+        return null;
+      }
+      return new OrderedSequenceNumber<>(sequenceNumber.toString(), false)
+      {
+        @Override
+        public int compareTo(OrderedSequenceNumber<String> other)
+        {
+          return Long.compare(Long.parseLong(get()), 
Long.parseLong(other.get()));
+        }
+      };
     }
 
     @Override
@@ -450,7 +747,9 @@ public class SeekableStreamIndexTaskRunnerTest
     @Override
     protected TypeReference<List<SequenceMetadata>> 
getSequenceMetadataTypeReference()
     {
-      return null;
+      return new TypeReference<>()
+      {
+      };
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to