This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1001b6f5d4b fix: ensure sequence metadata list accesses are threadsafe
in task runner (#19467)
1001b6f5d4b is described below
commit 1001b6f5d4bff72876ba730a00f34e5b1bf32631
Author: jtuglu1 <[email protected]>
AuthorDate: Mon May 18 21:34:49 2026 -0700
fix: ensure sequence metadata list accesses are threadsafe in task runner
(#19467)
---
.../indexing/kinesis/KinesisIndexTaskTest.java | 3 +-
.../SeekableStreamIndexTaskRunner.java | 356 +++++++-----
.../SeekableStreamIndexTaskRunnerTest.java | 609 +++++++++++++++------
3 files changed, 687 insertions(+), 281 deletions(-)
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 99fa37734ac..f4ccfd4fc70 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -113,7 +113,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -2086,7 +2085,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
task.getRunner().setToolbox(toolboxFactory.build(task));
task.getRunner().initializeSequences();
- final CopyOnWriteArrayList<SequenceMetadata<String, String>> sequences =
task.getRunner().getSequences();
+ final List<SequenceMetadata<String, String>> sequences =
task.getRunner().getSequences();
Assert.assertEquals(3, sequences.size());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 38de3dc49fe..4bcf539d6ae 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -28,6 +28,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -37,6 +38,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputFormat;
@@ -122,7 +124,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@@ -243,7 +244,9 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
protected volatile boolean pauseRequested = false;
private volatile long nextCheckpointTime;
- private volatile CopyOnWriteArrayList<SequenceMetadata<PartitionIdType,
SequenceOffsetType>> sequences;
+ private final Lock sequencesLock = new ReentrantLock();
+ @GuardedBy("sequencesLock")
+ private List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
sequences = new ArrayList<>();
private volatile Throwable backgroundThreadException;
private final Map<PartitionIdType, Long> partitionsThroughput = new
HashMap<>();
@@ -265,7 +268,6 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
this.inputFormat = ioConfig.getInputFormat();
this.stream = ioConfig.getStartSequenceNumbers().getStream();
this.endOffsets = new
ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
- this.sequences = new CopyOnWriteArrayList<>();
this.ingestionState = IngestionState.NOT_STARTED;
this.lockGranularityToUse = lockGranularityToUse;
@@ -388,7 +390,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
}
- log.info("Starting with sequences: %s", sequences);
+ log.info("Starting with sequences: %s", getSequencesSnapshot());
}
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
@@ -496,7 +498,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
if (restoredMetadata == null) {
// no persist has happened so far
// so either this is a brand new task or replacement of a failed task
-
Preconditions.checkState(sequences.get(0).startOffsets.entrySet().stream().allMatch(
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
firstSequence = getFirstSequenceMetadata();
+
Preconditions.checkState(firstSequence.startOffsets.entrySet().stream().allMatch(
partitionOffsetEntry ->
createSequenceNumber(partitionOffsetEntry.getValue()).compareTo(
createSequenceNumber(ioConfig.getStartSequenceNumbers()
@@ -504,7 +507,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
.get(partitionOffsetEntry.getKey())
)) >= 0
), "Sequence sequences are not compatible with start sequences of
task");
- currOffsets.putAll(sequences.get(0).startOffsets);
+ currOffsets.putAll(firstSequence.startOffsets);
} else {
@SuppressWarnings("unchecked")
final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
@@ -534,16 +537,15 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
// sequences size can be 0 only when all sequences got published and
task stopped before it could finish
// which is super rare
- if (sequences.size() == 0 ||
getLastSequenceMetadata().isCheckpointed()) {
- this.endOffsets.putAll(sequences.size() == 0
- ? currOffsets
- : getLastSequenceMetadata().getEndOffsets());
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
latestSequence = getLastSequenceMetadataOrNull();
+ if (latestSequence == null || latestSequence.isCheckpointed()) {
+ this.endOffsets.putAll(latestSequence == null ? currOffsets :
latestSequence.getEndOffsets());
}
}
log.info(
"Initialized sequences: %s",
-
sequences.stream().map(SequenceMetadata::toString).collect(Collectors.joining(",
"))
+
getSequencesSnapshot().stream().map(SequenceMetadata::toString).collect(Collectors.joining(",
"))
);
// Filter out partitions with END_OF_SHARD markers since these
partitions have already been fully read. This
@@ -623,7 +625,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
// if stop is requested or task's end sequence is set by call to
setEndOffsets method with finish set to true
- if (stopRequested.get() || sequences.size() == 0 ||
getLastSequenceMetadata().isCheckpointed()) {
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
latestSequence = getLastSequenceMetadataOrNull();
+ if (stopRequested.get() || latestSequence == null ||
latestSequence.isCheckpointed()) {
status = Status.PUBLISHING;
}
@@ -666,18 +669,15 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final List<InputRow> rows = reader.parse(record.getData(),
isEndOfShard(record.getSequenceNumber()));
boolean isPersistRequired = false;
- final SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceToUse = sequences
- .stream()
- .filter(sequenceMetadata -> sequenceMetadata.canHandle(this,
record))
- .findFirst()
- .orElse(null);
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceToUse =
+ findSequenceMetadataForRecord(record);
if (sequenceToUse == null) {
throw new ISE(
"Cannot find any valid sequence for record with partition
[%s] and sequenceNumber [%s]. Current sequences: %s",
record.getPartitionId(),
record.getSequenceNumber(),
- sequences
+ getSequencesSnapshot()
);
}
@@ -779,13 +779,13 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
if (sequenceToCheckpoint != null && stillReading) {
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
latestSequenceToCheckpoint =
+ getLastSequenceMetadata();
Preconditions.checkArgument(
- getLastSequenceMetadata()
- .getSequenceName()
- .equals(sequenceToCheckpoint.getSequenceName()),
+
latestSequenceToCheckpoint.getSequenceName().equals(sequenceToCheckpoint.getSequenceName()),
"Cannot checkpoint a sequence [%s] which is not the latest
one, sequences %s",
sequenceToCheckpoint,
- sequences
+ getSequencesSnapshot()
);
requestPause();
final CheckPointDataSourceMetadataAction checkpointAction = new
CheckPointDataSourceMetadataAction(
@@ -833,7 +833,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// We need to copy sequences here, because the success callback in
publishAndRegisterHandoff removes items from
// the sequence list. If a publish finishes before we finish iterating
through the sequence list, we can
// end up skipping some sequences.
- List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
sequencesSnapshot = new ArrayList<>(sequences);
+ final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
sequencesSnapshot = getSequencesSnapshot();
for (int i = 0; i < sequencesSnapshot.size(); i++) {
final SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceMetadata = sequencesSnapshot.get(i);
if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
@@ -976,7 +976,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final List<ListenableFuture<SegmentsAndCommitMetadata>> publishFinished =
publishWaitList
.stream()
.filter(Future::isDone)
- .collect(Collectors.toList());
+ .toList();
for (ListenableFuture<SegmentsAndCommitMetadata> publishFuture :
publishFinished) {
// If publishFuture failed, the below line will throw an exception and
catched by (1), and then (2) or (3).
@@ -989,7 +989,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final List<ListenableFuture<SegmentsAndCommitMetadata>> handoffFinished =
handOffWaitList
.stream()
.filter(Future::isDone)
- .collect(Collectors.toList());
+ .toList();
for (ListenableFuture<SegmentsAndCommitMetadata> handoffFuture :
handoffFinished) {
// If handoffFuture failed, the below line will throw an exception and
catched by (1), and then (2) or (3).
@@ -1043,7 +1043,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
log.infoSegments(publishedSegmentsAndCommitMetadata.getSegments(),
"Published segments");
publishedSequences.add(sequenceMetadata.getSequenceName());
- sequences.remove(sequenceMetadata);
+ removeSequence(sequenceMetadata);
publishingSequences.remove(sequenceMetadata.getSequenceName());
try {
@@ -1110,12 +1110,18 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
{
final File sequencesPersistFile = getSequencesPersistFile(toolbox);
if (sequencesPersistFile.exists()) {
- sequences = new CopyOnWriteArrayList<>(
+ final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
restoredSequences =
toolbox.getJsonMapper().readValue(
sequencesPersistFile,
getSequenceMetadataTypeReference()
- )
- );
+ );
+ sequencesLock.lock();
+ try {
+ sequences = new ArrayList<>(restoredSequences);
+ }
+ finally {
+ sequencesLock.unlock();
+ }
return true;
} else {
return false;
@@ -1124,11 +1130,12 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private synchronized void persistSequences() throws IOException
{
+ final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
sequencesSnapshot = getSequencesSnapshot();
toolbox.getJsonMapper().writerFor(
getSequenceMetadataTypeReference()
- ).writeValue(getSequencesPersistFile(toolbox), sequences);
+ ).writeValue(getSequencesPersistFile(toolbox), sequencesSnapshot);
- log.info("Saved sequence metadata to disk: %s", sequences);
+ log.info("Saved sequence metadata to disk: %s", sequencesSnapshot);
}
/**
@@ -1194,7 +1201,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private void maybePersistAndPublishSequences(Supplier<Committer>
committerSupplier)
throws InterruptedException
{
- for (SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceMetadata : sequences) {
+ for (SequenceMetadata<PartitionIdType, SequenceOffsetType>
sequenceMetadata : getSequencesSnapshot()) {
sequenceMetadata.updateAssignments(currOffsets,
this::isMoreToReadBeforeReadingRecord);
if (!sequenceMetadata.isOpen()
&& !publishingSequences.contains(sequenceMetadata.getSequenceName())
@@ -1248,44 +1255,124 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private void addSequence(final SequenceMetadata<PartitionIdType,
SequenceOffsetType> sequenceMetadata)
{
- // Sanity check that the start of the new sequence matches up with the end
of the prior sequence.
- for (Map.Entry<PartitionIdType, SequenceOffsetType> entry :
sequenceMetadata.getStartOffsets().entrySet()) {
- final PartitionIdType partition = entry.getKey();
- final SequenceOffsetType startOffset = entry.getValue();
+ sequencesLock.lock();
+ try {
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
latestSequence =
+ getLastSequenceMetadataOrNull(sequences);
+
+ // Sanity check that the start of the new sequence matches up with the
end of the prior sequence.
+ for (Map.Entry<PartitionIdType, SequenceOffsetType> entry :
sequenceMetadata.getStartOffsets().entrySet()) {
+ final PartitionIdType partition = entry.getKey();
+ final SequenceOffsetType startOffset = entry.getValue();
- if (!sequences.isEmpty()) {
- final SequenceOffsetType priorOffset =
getLastSequenceMetadata().endOffsets.get(partition);
+ if (latestSequence != null) {
+ final SequenceOffsetType priorOffset =
latestSequence.getEndOffsets().get(partition);
+
+ if (!startOffset.equals(priorOffset)) {
+ throw new ISE(
+ "New sequence startOffset[%s] does not equal expected prior
offset[%s]",
+ startOffset,
+ priorOffset
+ );
+ }
+ }
+ }
- if (!startOffset.equals(priorOffset)) {
+ if (!isEndOffsetExclusive() && latestSequence != null) {
+ if
(!latestSequence.getEndOffsets().keySet().equals(sequenceMetadata.getExclusiveStartPartitions()))
{
throw new ISE(
- "New sequence startOffset[%s] does not equal expected prior
offset[%s]",
- startOffset,
- priorOffset
+ "Exclusive start partitions[%s] for new sequence don't match to
the prior offset[%s]",
+ sequenceMetadata.getExclusiveStartPartitions(),
+ latestSequence
);
}
}
- }
- if (!isEndOffsetExclusive() && !sequences.isEmpty()) {
- final SequenceMetadata<PartitionIdType, SequenceOffsetType> lastMetadata
= getLastSequenceMetadata();
- if
(!lastMetadata.endOffsets.keySet().equals(sequenceMetadata.getExclusiveStartPartitions()))
{
- throw new ISE(
- "Exclusive start partitions[%s] for new sequence don't match to
the prior offset[%s]",
- sequenceMetadata.getExclusiveStartPartitions(),
- lastMetadata
- );
- }
+ // Actually do the add.
+ sequences.add(sequenceMetadata);
+ }
+ finally {
+ sequencesLock.unlock();
}
+ }
- // Actually do the add.
- sequences.add(sequenceMetadata);
+ private void removeSequence(final SequenceMetadata<PartitionIdType,
SequenceOffsetType> sequenceMetadata)
+ {
+ sequencesLock.lock();
+ try {
+ sequences.remove(sequenceMetadata);
+ }
+ finally {
+ sequencesLock.unlock();
+ }
}
@VisibleForTesting
public SequenceMetadata<PartitionIdType, SequenceOffsetType>
getLastSequenceMetadata()
{
- Preconditions.checkState(!sequences.isEmpty(), "Empty sequences");
- return sequences.get(sequences.size() - 1);
+ return getLastSequenceMetadata(getSequencesSnapshot());
+ }
+
+ private SequenceMetadata<PartitionIdType, SequenceOffsetType>
getFirstSequenceMetadata()
+ {
+ final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
sequenceMetadataList = getSequencesSnapshot();
+ Preconditions.checkState(!sequenceMetadataList.isEmpty(), "Empty
sequences");
+ return sequenceMetadataList.get(0);
+ }
+
+ @Nullable
+ private SequenceMetadata<PartitionIdType, SequenceOffsetType>
getLastSequenceMetadataOrNull()
+ {
+ return getLastSequenceMetadataOrNull(getSequencesSnapshot());
+ }
+
+ @Nullable
+ private SequenceMetadata<PartitionIdType, SequenceOffsetType>
getLastSequenceMetadataOrNull(
+ final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
sequenceMetadataList
+ )
+ {
+ if (sequenceMetadataList.isEmpty()) {
+ return null;
+ }
+ return sequenceMetadataList.get(sequenceMetadataList.size() - 1);
+ }
+
+ private SequenceMetadata<PartitionIdType, SequenceOffsetType>
getLastSequenceMetadata(
+ final List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
sequenceMetadataList
+ )
+ {
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence
=
+ getLastSequenceMetadataOrNull(sequenceMetadataList);
+ Preconditions.checkState(latestSequence != null, "Empty sequences");
+ return latestSequence;
+ }
+
+ private List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
getSequencesSnapshot()
+ {
+ sequencesLock.lock();
+ try {
+ return ImmutableList.copyOf(sequences);
+ }
+ finally {
+ sequencesLock.unlock();
+ }
+ }
+
+ @Nullable
+ private SequenceMetadata<PartitionIdType, SequenceOffsetType>
findSequenceMetadataForRecord(
+ final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType,
RecordType> record
+ )
+ {
+ sequencesLock.lock();
+ try {
+ return sequences.stream()
+ .filter(sequenceMetadata ->
sequenceMetadata.canHandle(this, record))
+ .findFirst()
+ .orElse(null);
+ }
+ finally {
+ sequencesLock.unlock();
+ }
}
/**
@@ -1727,9 +1814,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// and after acquiring pauseLock to correctly guard against duplicate
requests
Preconditions.checkState(sequenceNumbers.size() > 0, "No sequences
found to set end sequences");
- final SequenceMetadata<PartitionIdType, SequenceOffsetType>
latestSequence = getLastSequenceMetadata();
final Set<PartitionIdType> exclusiveStartPartitions;
-
if (isEndOffsetExclusive()) {
// When end offsets are exclusive, there's no need for marking the
next sequence as having any
// exclusive-start partitions. It should always start from the end
offsets of the prior sequence.
@@ -1740,75 +1825,98 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
exclusiveStartPartitions = sequenceNumbers.keySet();
}
- if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
- &&
latestSequence.getExclusiveStartPartitions().equals(exclusiveStartPartitions)
- && !finish)
- || (latestSequence.getEndOffsets().equals(sequenceNumbers) &&
finish)) {
- log.warn("Ignoring duplicate request, end offsets already set for
sequences [%s]", sequenceNumbers);
- resetNextCheckpointTime();
- resume();
- return Response.ok(sequenceNumbers).build();
- } else if (latestSequence.isCheckpointed()) {
- return Response.status(Response.Status.BAD_REQUEST)
- .type(MediaType.TEXT_PLAIN)
- .entity(StringUtils.format(
- "Sequence [%s] has already endOffsets set, cannot
set to [%s]",
- latestSequence,
- sequenceNumbers
- )).build();
- } else if (!isPaused()) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity("Task must be paused before changing the end
offsets")
- .build();
- }
+ Response earlyResponse = null;
+ boolean resumeBeforeReturning = false;
- for (Map.Entry<PartitionIdType, SequenceOffsetType> entry :
sequenceNumbers.entrySet()) {
- if
(createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey())))
- < 0) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(
- StringUtils.format(
- "End sequence must be >= current sequence
for partition [%s] (current: %s)",
- entry.getKey(),
- currOffsets.get(entry.getKey())
- )
- )
- .build();
- }
- }
+ sequencesLock.lock();
+ try {
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
latestSequence =
+ getLastSequenceMetadata(sequences);
+
+ if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
+ &&
latestSequence.getExclusiveStartPartitions().equals(exclusiveStartPartitions)
+ && !finish)
+ || (latestSequence.getEndOffsets().equals(sequenceNumbers) &&
finish)) {
+ log.warn("Ignoring duplicate request, end offsets already set for
sequences [%s]", sequenceNumbers);
+ resetNextCheckpointTime();
+ resumeBeforeReturning = true;
+ earlyResponse = Response.ok(sequenceNumbers).build();
+ } else if (latestSequence.isCheckpointed()) {
+ earlyResponse = Response.status(Response.Status.BAD_REQUEST)
+ .type(MediaType.TEXT_PLAIN)
+ .entity(StringUtils.format(
+ "Sequence [%s] has already endOffsets
set, cannot set to [%s]",
+ latestSequence,
+ sequenceNumbers
+ )).build();
+ } else if (!isPaused()) {
+ earlyResponse = Response.status(Response.Status.BAD_REQUEST)
+ .entity("Task must be paused before
changing the end offsets")
+ .build();
+ } else {
+ for (Map.Entry<PartitionIdType, SequenceOffsetType> entry :
sequenceNumbers.entrySet()) {
+ if
(createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey())))
+ < 0) {
+ earlyResponse = Response.status(Response.Status.BAD_REQUEST)
+ .entity(
+ StringUtils.format(
+ "End sequence must be >=
current sequence for partition [%s] (current: %s)",
+ entry.getKey(),
+ currOffsets.get(entry.getKey())
+ )
+ )
+ .build();
+ break;
+ }
+ }
- resetNextCheckpointTime();
- latestSequence.setEndOffsets(sequenceNumbers);
+ if (earlyResponse == null) {
+ resetNextCheckpointTime();
+ latestSequence.setEndOffsets(sequenceNumbers);
- if (finish) {
- log.info(
- "Sequence[%s] end offsets updated from [%s] to [%s].",
- latestSequence.getSequenceName(),
- endOffsets,
- sequenceNumbers
- );
- endOffsets.putAll(sequenceNumbers);
- } else {
- // create new sequence
- final SequenceMetadata<PartitionIdType, SequenceOffsetType>
newSequence = new SequenceMetadata<>(
- latestSequence.getSequenceId() + 1,
- StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(),
latestSequence.getSequenceId() + 1),
- sequenceNumbers,
- endOffsets,
- false,
- exclusiveStartPartitions,
- getTaskLockType()
- );
+ if (finish) {
+ log.info(
+ "Sequence[%s] end offsets updated from [%s] to [%s].",
+ latestSequence.getSequenceName(),
+ endOffsets,
+ sequenceNumbers
+ );
+ endOffsets.putAll(sequenceNumbers);
+ } else {
+ // create new sequence
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType>
newSequence = new SequenceMetadata<>(
+ latestSequence.getSequenceId() + 1,
+ StringUtils.format("%s_%d",
ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1),
+ sequenceNumbers,
+ endOffsets,
+ false,
+ exclusiveStartPartitions,
+ getTaskLockType()
+ );
- log.info(
- "Sequence[%s] created with start offsets [%s] and end offsets
[%s].",
- newSequence.getSequenceName(),
- sequenceNumbers,
- endOffsets
- );
+ log.info(
+ "Sequence[%s] created with start offsets [%s] and end
offsets [%s].",
+ newSequence.getSequenceName(),
+ sequenceNumbers,
+ endOffsets
+ );
+
+ addSequence(newSequence);
+ }
+ }
+ }
+ }
+ finally {
+ sequencesLock.unlock();
+ }
- addSequence(newSequence);
+ if (earlyResponse != null) {
+ if (resumeBeforeReturning) {
+ resume();
+ }
+ return earlyResponse;
}
+
persistSequences();
}
catch (Exception e) {
@@ -1836,9 +1944,9 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
@VisibleForTesting
- public CopyOnWriteArrayList<SequenceMetadata<PartitionIdType,
SequenceOffsetType>> getSequences()
+ public List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>
getSequences()
{
- return sequences;
+ return getSequencesSnapshot();
}
@GET
@@ -1854,7 +1962,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private Map<Integer, Map<PartitionIdType, SequenceOffsetType>>
getCheckpoints()
{
- return new TreeMap<>(sequences.stream()
+ return new TreeMap<>(getSequencesSnapshot().stream()
.collect(Collectors.toMap(
SequenceMetadata::getSequenceId,
SequenceMetadata::getStartOffsets
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index f08ee84d89d..7f0e731d1d3 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import org.apache.druid.client.DruidServer;
import org.apache.druid.data.input.InputRow;
@@ -60,6 +61,7 @@ import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
+import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -71,19 +73,31 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import javax.annotation.Nullable;
+import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.mockito.ArgumentMatchers.any;
@RunWith(MockitoJUnitRunner.class)
public class SeekableStreamIndexTaskRunnerTest
{
+ private static final String DATA_SOURCE = "datasource";
+
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -102,48 +116,161 @@ public class SeekableStreamIndexTaskRunnerTest
}
@Test
- public void testWithinMinMaxTime()
+ public void
testGetLastSequenceMetadataUsesStableSnapshotIfSequenceCompletesConcurrently()
throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner();
+ final SequenceMetadata<String, String> firstSequence = new
SequenceMetadata<>(
+ 0,
+ "test_0",
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "5"),
+ true,
+ ImmutableSet.of(),
+ null
+ );
+ final SequenceMetadata<String, String> secondSequence = new
SequenceMetadata<>(
+ 1,
+ "test_1",
+ ImmutableMap.of("partition", "5"),
+ ImmutableMap.of("partition", "10"),
+ false,
+ ImmutableSet.of(),
+ null
+ );
+ final ShrinkingCopyOnWriteArrayList<SequenceMetadata<String, String>>
sequences =
+ new ShrinkingCopyOnWriteArrayList<>();
+ sequences.add(firstSequence);
+ sequences.add(secondSequence);
+ setSequences(runner, sequences);
+
+ sequences.removeFirstElementDuringNextSnapshotOrSize();
+
+ Assert.assertSame(secondSequence, runner.getLastSequenceMetadata());
+ Assert.assertEquals(1, sequences.size());
+ Assert.assertSame(secondSequence, sequences.get(0));
+ }
+
+ @Test
+ public void testSetEndOffsetsReturnsBadRequestWhenTaskIsNotPaused() throws
Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createInitializedRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "10")
+ );
+ setStatus(runner, SeekableStreamIndexTaskRunner.Status.READING);
+
+ final Response response =
runner.setEndOffsets(ImmutableMap.of("partition", "5"), false);
+
+ Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
response.getStatus());
+ Assert.assertEquals("Task must be paused before changing the end offsets",
response.getEntity());
+ }
+
+ @Test
+ public void
testSetEndOffsetsReturnsBadRequestWhenLatestSequenceIsCheckpointed() throws
Exception
{
- DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ final TestSeekableStreamIndexTaskRunner runner = createInitializedRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "10")
+ );
+ setSequences(
+ runner,
Arrays.asList(
- new StringDimensionSchema("d1"),
- new StringDimensionSchema("d2")
+ new SequenceMetadata<>(
+ 0,
+ "test_0",
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "5"),
+ true,
+ ImmutableSet.of(),
+ null
+ )
)
);
- DataSchema schema =
- DataSchema.builder()
- .withDataSource("datasource")
- .withTimestamp(TimestampSpec.DEFAULT)
- .withDimensions(dimensionsSpec)
- .withGranularity(
- new UniformGranularitySpec(Granularities.MINUTE,
Granularities.NONE, null)
- )
- .build();
-
- SeekableStreamIndexTaskTuningConfig tuningConfig =
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
- SeekableStreamIndexTaskIOConfig<String, String> ioConfig =
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
- SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers =
Mockito.mock(SeekableStreamStartSequenceNumbers.class);
- SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers =
Mockito.mock(SeekableStreamEndSequenceNumbers.class);
-
- DateTime now = DateTimes.nowUtc();
-
-
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(120L);
-
Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(DateTimes.nowUtc().plusHours(2));
-
Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(DateTimes.nowUtc().minusHours(2));
- Mockito.when(ioConfig.getInputFormat()).thenReturn(new
JsonInputFormat(null, null, null, null, null));
-
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
-
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
-
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
- Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
+ final Response response =
runner.setEndOffsets(ImmutableMap.of("partition", "6"), false);
- Mockito.when(task.getDataSchema()).thenReturn(schema);
- Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
- Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
+ Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
response.getStatus());
+ Assert.assertTrue(response.getEntity().toString().contains("has already
endOffsets set"));
+ }
- TestSeekableStreamIndexTaskRunner runner = new
TestSeekableStreamIndexTaskRunner(
- task,
- LockGranularity.TIME_CHUNK
+ @Test
+ public void
testSetEndOffsetsReturnsBadRequestWhenEndOffsetPrecedesCurrentOffset() throws
Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createInitializedRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "10")
+ );
+ setCurrentOffsets(runner, ImmutableMap.of("partition", "5"));
+
+ try (final PausedRunner ignored = pauseRunner(runner)) {
+ final Response response =
runner.setEndOffsets(ImmutableMap.of("partition", "4"), false);
+
+ Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
response.getStatus());
+ Assert.assertEquals(
+ "End sequence must be >= current sequence for partition [partition]
(current: 5)",
+ response.getEntity()
+ );
+ Assert.assertFalse(runner.getLastSequenceMetadata().isCheckpointed());
+ Assert.assertEquals(1, runner.getSequences().size());
+ }
+ }
+
+ @Test
+ public void testSetEndOffsetsCreatesNewSequenceWhenPaused() throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createInitializedRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "10")
+ );
+ setCurrentOffsets(runner, ImmutableMap.of("partition", "4"));
+
+ try (final PausedRunner pausedRunner = pauseRunner(runner)) {
+ final Response response =
runner.setEndOffsets(ImmutableMap.of("partition", "5"), false);
+
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
response.getStatus());
+ pausedRunner.awaitResumed();
+ }
+
+ final List<SequenceMetadata<String, String>> sequences =
runner.getSequences();
+ Assert.assertEquals(2, sequences.size());
+ Assert.assertTrue(sequences.get(0).isCheckpointed());
+ Assert.assertEquals(ImmutableMap.of("partition", "5"),
sequences.get(0).getEndOffsets());
+ Assert.assertEquals("test_1", sequences.get(1).getSequenceName());
+ Assert.assertEquals(ImmutableMap.of("partition", "5"),
sequences.get(1).getStartOffsets());
+ Assert.assertEquals(ImmutableMap.of("partition", "10"),
sequences.get(1).getEndOffsets());
+ Assert.assertEquals(ImmutableSet.of("partition"),
sequences.get(1).getExclusiveStartPartitions());
+ }
+
+ @Test
+ public void testSetEndOffsetsFinishUpdatesLatestSequenceWhenPaused() throws
Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createInitializedRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "10")
+ );
+ setCurrentOffsets(runner, ImmutableMap.of("partition", "4"));
+
+ try (final PausedRunner pausedRunner = pauseRunner(runner)) {
+ final Response response =
runner.setEndOffsets(ImmutableMap.of("partition", "6"), true);
+
+ Assert.assertEquals(Response.Status.OK.getStatusCode(),
response.getStatus());
+ pausedRunner.awaitResumed();
+ }
+
+ final List<SequenceMetadata<String, String>> sequences =
runner.getSequences();
+ Assert.assertEquals(1, sequences.size());
+ Assert.assertTrue(sequences.get(0).isCheckpointed());
+ Assert.assertEquals(ImmutableMap.of("partition", "6"),
sequences.get(0).getEndOffsets());
+ }
+
+ @Test
+ public void testWithinMinMaxTime()
+ {
+ final DateTime now = DateTimes.nowUtc();
+ final TestSeekableStreamIndexTaskRunner runner =
createRunnerWithMessageTimeBounds(
+ 120L,
+ now.minusHours(2),
+ now.plusHours(2)
);
Mockito.when(row.getTimestamp()).thenReturn(now);
@@ -159,47 +286,8 @@ public class SeekableStreamIndexTaskRunnerTest
@Test
public void testWithinMinMaxTimeNotPopulated()
{
- DimensionsSpec dimensionsSpec = new DimensionsSpec(
- Arrays.asList(
- new StringDimensionSchema("d1"),
- new StringDimensionSchema("d2")
- )
- );
- DataSchema schema =
- DataSchema.builder()
- .withDataSource("datasource")
- .withTimestamp(TimestampSpec.DEFAULT)
- .withDimensions(dimensionsSpec)
- .withGranularity(
- new UniformGranularitySpec(Granularities.MINUTE,
Granularities.NONE, null)
- )
- .build();
-
- SeekableStreamIndexTaskTuningConfig tuningConfig =
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
- SeekableStreamIndexTaskIOConfig<String, String> ioConfig =
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
- SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers =
Mockito.mock(SeekableStreamStartSequenceNumbers.class);
- SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers =
Mockito.mock(SeekableStreamEndSequenceNumbers.class);
-
- DateTime now = DateTimes.nowUtc();
-
-
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
- // min max time not populated.
- Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(null);
- Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(null);
- Mockito.when(ioConfig.getInputFormat()).thenReturn(new
JsonInputFormat(null, null, null, null, null));
-
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
-
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
-
-
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
- Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
-
- Mockito.when(task.getDataSchema()).thenReturn(schema);
- Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
- Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
- TestSeekableStreamIndexTaskRunner runner = new
TestSeekableStreamIndexTaskRunner(
- task,
- LockGranularity.TIME_CHUNK
- );
+ final DateTime now = DateTimes.nowUtc();
+ final TestSeekableStreamIndexTaskRunner runner = createRunner();
Mockito.when(row.getTimestamp()).thenReturn(now);
Assert.assertEquals(InputRowFilterResult.ACCEPTED,
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row));
@@ -214,45 +302,7 @@ public class SeekableStreamIndexTaskRunnerTest
@Test
public void testEnsureRowRejectionReasonForNullRow()
{
- DimensionsSpec dimensionsSpec = new DimensionsSpec(
- Arrays.asList(
- new StringDimensionSchema("d1"),
- new StringDimensionSchema("d2")
- )
- );
- DataSchema schema =
- DataSchema.builder()
- .withDataSource("datasource")
- .withTimestamp(TimestampSpec.DEFAULT)
- .withDimensions(dimensionsSpec)
- .withGranularity(
- new UniformGranularitySpec(Granularities.MINUTE,
Granularities.NONE, null)
- )
- .build();
-
- SeekableStreamIndexTaskTuningConfig tuningConfig =
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
- SeekableStreamIndexTaskIOConfig<String, String> ioConfig =
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
- SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers =
Mockito.mock(SeekableStreamStartSequenceNumbers.class);
- SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers =
Mockito.mock(SeekableStreamEndSequenceNumbers.class);
-
-
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
- Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(null);
- Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(null);
- Mockito.when(ioConfig.getInputFormat()).thenReturn(new
JsonInputFormat(null, null, null, null, null));
-
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
-
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
-
-
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
- Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
-
- Mockito.when(task.getDataSchema()).thenReturn(schema);
- Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
- Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
-
- TestSeekableStreamIndexTaskRunner runner = new
TestSeekableStreamIndexTaskRunner(
- task,
- LockGranularity.TIME_CHUNK
- );
+ final TestSeekableStreamIndexTaskRunner runner = createRunner();
Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD,
runner.ensureRowIsNonNullAndWithinMessageTimeBounds(null));
}
@@ -260,44 +310,9 @@ public class SeekableStreamIndexTaskRunnerTest
@Test
public void test_run_emitsRowCountAndSegmentCount_onSuccessfulPublish()
{
- DimensionsSpec dimensionsSpec = new DimensionsSpec(
- Arrays.asList(
- new StringDimensionSchema("d1"),
- new StringDimensionSchema("d2")
- )
- );
- DataSchema schema =
- DataSchema.builder()
- .withDataSource("datasource")
- .withTimestamp(TimestampSpec.DEFAULT)
- .withDimensions(dimensionsSpec)
- .withGranularity(
- new UniformGranularitySpec(Granularities.MINUTE,
Granularities.NONE, null)
- )
- .build();
-
- SeekableStreamIndexTaskTuningConfig tuningConfig =
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
- SeekableStreamIndexTaskIOConfig<String, String> ioConfig =
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
- SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers =
Mockito.mock(SeekableStreamStartSequenceNumbers.class);
- SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers =
Mockito.mock(SeekableStreamEndSequenceNumbers.class);
-
-
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null);
- Mockito.when(ioConfig.getInputFormat()).thenReturn(new
JsonInputFormat(null, null, null, null, null));
-
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
-
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
-
-
Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of());
- Mockito.when(sequenceNumbers.getStream()).thenReturn("test");
-
- Mockito.when(task.getDataSchema()).thenReturn(schema);
- Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
- Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
+ final TestSeekableStreamIndexTaskRunner runner = createRunner();
Mockito.when(task.getId()).thenReturn("task1");
Mockito.when(task.getSupervisorId()).thenReturn("supervisorId");
- TestSeekableStreamIndexTaskRunner runner = new
TestSeekableStreamIndexTaskRunner(
- task,
- LockGranularity.TIME_CHUNK
- );
Assert.assertEquals("supervisorId", runner.getSupervisorId());
// Setup the task to return a RecordSupplier, StreamAppenderatorDriver,
Appenderator
@@ -310,7 +325,7 @@ public class SeekableStreamIndexTaskRunnerTest
.thenReturn(appenderator);
final List<DataSegment> segment = CreateDataSegments
- .ofDatasource(schema.getDataSource())
+ .ofDatasource(DATA_SOURCE)
.withNumPartitions(10)
.withNumRows(1_000)
.eachOfSizeInMb(500);
@@ -372,6 +387,275 @@ public class SeekableStreamIndexTaskRunnerTest
}
}
+ private TestSeekableStreamIndexTaskRunner createRunner()
+ {
+ return createRunner(ImmutableMap.of(), ImmutableMap.of());
+ }
+
+ private TestSeekableStreamIndexTaskRunner createRunner(
+ Map<String, String> startOffsets,
+ Map<String, String> endOffsets
+ )
+ {
+ return createRunner(createDataSchema(), null, null, null, startOffsets,
endOffsets);
+ }
+
+ private TestSeekableStreamIndexTaskRunner createRunnerWithMessageTimeBounds(
+ Long refreshRejectionPeriodsInMinutes,
+ DateTime minMessageTime,
+ DateTime maxMessageTime
+ )
+ {
+ return createRunner(
+ createDataSchema(),
+ refreshRejectionPeriodsInMinutes,
+ minMessageTime,
+ maxMessageTime,
+ ImmutableMap.of(),
+ ImmutableMap.of()
+ );
+ }
+
+ private TestSeekableStreamIndexTaskRunner createRunner(
+ DataSchema schema,
+ @Nullable Long refreshRejectionPeriodsInMinutes,
+ @Nullable DateTime minMessageTime,
+ @Nullable DateTime maxMessageTime,
+ Map<String, String> startOffsets,
+ Map<String, String> endOffsets
+ )
+ {
+ final SeekableStreamIndexTaskTuningConfig tuningConfig =
Mockito.mock(SeekableStreamIndexTaskTuningConfig.class);
+ final SeekableStreamIndexTaskIOConfig<String, String> ioConfig =
Mockito.mock(SeekableStreamIndexTaskIOConfig.class);
+ final SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers =
new SeekableStreamStartSequenceNumbers<>(
+ "test",
+ startOffsets,
+ ImmutableSet.of()
+ );
+ final SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers
= new SeekableStreamEndSequenceNumbers<>(
+ "test",
+ endOffsets
+ );
+
+
Mockito.when(tuningConfig.getIntermediateHandoffPeriod()).thenReturn(Period.minutes(1));
+
Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(refreshRejectionPeriodsInMinutes);
+ Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(maxMessageTime);
+ Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(minMessageTime);
+ Mockito.when(ioConfig.getInputFormat()).thenReturn(new
JsonInputFormat(null, null, null, null, null));
+
Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers);
+
Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers);
+ Mockito.when(ioConfig.getBaseSequenceName()).thenReturn("test");
+
+ Mockito.when(task.getDataSchema()).thenReturn(schema);
+ Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
+ Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);
+ Mockito.when(task.getContext()).thenReturn(ImmutableMap.of());
+
+ return new TestSeekableStreamIndexTaskRunner(
+ task,
+ LockGranularity.TIME_CHUNK
+ );
+ }
+
+ private static DataSchema createDataSchema()
+ {
+ final DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("d1"),
+ new StringDimensionSchema("d2")
+ )
+ );
+ return DataSchema.builder()
+ .withDataSource(DATA_SOURCE)
+ .withTimestamp(TimestampSpec.DEFAULT)
+ .withDimensions(dimensionsSpec)
+ .withGranularity(
+ new UniformGranularitySpec(Granularities.MINUTE,
Granularities.NONE, null)
+ )
+ .build();
+ }
+
+ private TestSeekableStreamIndexTaskRunner createInitializedRunner(
+ Map<String, String> startOffsets,
+ Map<String, String> endOffsets
+ ) throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner =
createRunner(startOffsets, endOffsets);
+ runner.setToolbox(createTaskToolbox());
+ runner.initializeSequences();
+ setCurrentOffsets(runner, startOffsets);
+ return runner;
+ }
+
+ private static void setSequences(
+ SeekableStreamIndexTaskRunner runner,
+ List<? extends SequenceMetadata> sequences
+ ) throws NoSuchFieldException, IllegalAccessException
+ {
+ final Field sequencesField =
SeekableStreamIndexTaskRunner.class.getDeclaredField("sequences");
+ sequencesField.setAccessible(true);
+ sequencesField.set(runner, sequences);
+ }
+
+ private static void setCurrentOffsets(
+ SeekableStreamIndexTaskRunner runner,
+ Map<?, ?> currentOffsets
+ ) throws NoSuchFieldException, IllegalAccessException
+ {
+ final Field currOffsetsField =
SeekableStreamIndexTaskRunner.class.getDeclaredField("currOffsets");
+ currOffsetsField.setAccessible(true);
+ final Map currOffsets = (Map) currOffsetsField.get(runner);
+ currOffsets.clear();
+ currOffsets.putAll(currentOffsets);
+ }
+
+ private static void setStatus(
+ SeekableStreamIndexTaskRunner runner,
+ SeekableStreamIndexTaskRunner.Status status
+ ) throws NoSuchFieldException, IllegalAccessException
+ {
+ final Field statusField =
SeekableStreamIndexTaskRunner.class.getDeclaredField("status");
+ statusField.setAccessible(true);
+ statusField.set(runner, status);
+ }
+
+ private static void setPauseRequested(
+ SeekableStreamIndexTaskRunner runner,
+ boolean pauseRequested
+ ) throws NoSuchFieldException, IllegalAccessException
+ {
+ final Field pauseRequestedField =
SeekableStreamIndexTaskRunner.class.getDeclaredField("pauseRequested");
+ pauseRequestedField.setAccessible(true);
+ pauseRequestedField.set(runner, pauseRequested);
+ }
+
+ private static SeekableStreamIndexTaskRunner.Status getStatus(
+ SeekableStreamIndexTaskRunner runner
+ ) throws NoSuchFieldException, IllegalAccessException
+ {
+ final Field statusField =
SeekableStreamIndexTaskRunner.class.getDeclaredField("status");
+ statusField.setAccessible(true);
+ return (SeekableStreamIndexTaskRunner.Status) statusField.get(runner);
+ }
+
+ private static PausedRunner pauseRunner(TestSeekableStreamIndexTaskRunner
runner) throws Exception
+ {
+ setStatus(runner, SeekableStreamIndexTaskRunner.Status.READING);
+ setPauseRequested(runner, true);
+
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ final Future<Boolean> possiblyPauseFuture = executor.submit(() ->
invokePossiblyPause(runner));
+ waitForStatus(runner, SeekableStreamIndexTaskRunner.Status.PAUSED);
+ return new PausedRunner(runner, executor, possiblyPauseFuture);
+ }
+
+ private static void waitForStatus(
+ SeekableStreamIndexTaskRunner runner,
+ SeekableStreamIndexTaskRunner.Status status
+ ) throws Exception
+ {
+ final long deadline = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(2);
+ while (System.currentTimeMillis() < deadline) {
+ if (getStatus(runner) == status) {
+ return;
+ }
+ Thread.sleep(10);
+ }
+ Assert.fail("Timed out waiting for status [" + status + "]");
+ }
+
+ private static boolean invokePossiblyPause(SeekableStreamIndexTaskRunner
runner) throws Exception
+ {
+ final Method possiblyPauseMethod =
SeekableStreamIndexTaskRunner.class.getDeclaredMethod("possiblyPause");
+ possiblyPauseMethod.setAccessible(true);
+ try {
+ return (boolean) possiblyPauseMethod.invoke(runner);
+ }
+ catch (InvocationTargetException e) {
+ final Throwable cause = e.getCause();
+ if (cause instanceof Exception) {
+ throw (Exception) cause;
+ } else if (cause instanceof Error) {
+ throw (Error) cause;
+ } else {
+ throw new RuntimeException(cause);
+ }
+ }
+ }
+
+ private static class PausedRunner implements AutoCloseable
+ {
+ private final TestSeekableStreamIndexTaskRunner runner;
+ private final ExecutorService executor;
+ private final Future<Boolean> possiblyPauseFuture;
+
+ private PausedRunner(
+ TestSeekableStreamIndexTaskRunner runner,
+ ExecutorService executor,
+ Future<Boolean> possiblyPauseFuture
+ )
+ {
+ this.runner = runner;
+ this.executor = executor;
+ this.possiblyPauseFuture = possiblyPauseFuture;
+ }
+
+ void awaitResumed() throws Exception
+ {
+ Assert.assertTrue(possiblyPauseFuture.get(2, TimeUnit.SECONDS));
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ try {
+ if (!possiblyPauseFuture.isDone()) {
+ runner.resume();
+ awaitResumed();
+ }
+ }
+ finally {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ private static class ShrinkingCopyOnWriteArrayList<E> extends
CopyOnWriteArrayList<E>
+ {
+ private final AtomicBoolean removeFirstElement = new AtomicBoolean(false);
+
+ void removeFirstElementDuringNextSnapshotOrSize()
+ {
+ removeFirstElement.set(true);
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return super.size() == 0;
+ }
+
+ @Override
+ public int size()
+ {
+ final int size = super.size();
+ if (removeFirstElement.compareAndSet(true, false)) {
+ remove(0);
+ }
+ return size;
+ }
+
+ @Override
+ public Object[] toArray()
+ {
+ final Object[] snapshot = super.toArray();
+ if (removeFirstElement.compareAndSet(true, false)) {
+ remove(0);
+ }
+ return snapshot;
+ }
+ }
+
private static class NoopDruidNodeAnnouncer implements DruidNodeAnnouncer
{
@@ -414,7 +698,10 @@ public class SeekableStreamIndexTaskRunnerTest
@Override
protected Object getNextStartOffset(Object sequenceNumber)
{
- return null;
+ if (sequenceNumber == null) {
+ return null;
+ }
+ return String.valueOf(Long.parseLong(sequenceNumber.toString()) + 1);
}
@Override
@@ -438,7 +725,17 @@ public class SeekableStreamIndexTaskRunnerTest
@Override
protected OrderedSequenceNumber createSequenceNumber(Object sequenceNumber)
{
- return null;
+ if (sequenceNumber == null) {
+ return null;
+ }
+ return new OrderedSequenceNumber<>(sequenceNumber.toString(), false)
+ {
+ @Override
+ public int compareTo(OrderedSequenceNumber<String> other)
+ {
+ return Long.compare(Long.parseLong(get()),
Long.parseLong(other.get()));
+ }
+ };
}
@Override
@@ -450,7 +747,9 @@ public class SeekableStreamIndexTaskRunnerTest
@Override
protected TypeReference<List<SequenceMetadata>>
getSequenceMetadataTypeReference()
{
- return null;
+ return new TypeReference<>()
+ {
+ };
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]