This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 67c7b6248cf Fix log typos, clean up some kill messages in
SeekableStreamSupervisor (#15424)
67c7b6248cf is described below
commit 67c7b6248cfead7b9ab6950e13cc33583dbf8b88
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Nov 24 16:09:10 2023 +0530
Fix log typos, clean up some kill messages in SeekableStreamSupervisor
(#15424)
Changes:
- Fix log `Got end of partition marker for partition [%s] from task [%s] in
discoverTasks`
by fixing order of args
- Simplify in-line classes by using lambda
- Update kill task message from `Task [%s] failed to respond to [set end
offsets]
in a timely manner, killing task` to `Failed to set end offsets, killing
task`
- Clean up tests
---
.../kafka/supervisor/KafkaSupervisorTest.java | 4 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 6 +-
.../supervisor/SeekableStreamSupervisor.java | 100 ++++------
.../SeekableStreamSupervisorStateTest.java | 206 ++++++++++-----------
4 files changed, 130 insertions(+), 186 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 3087bdcbb1a..b221cdf418c 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -145,7 +145,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static final String TOPIC_PREFIX = "testTopic";
private static final String DATASOURCE = "testDS";
private static final int NUM_PARTITIONS = 3;
- private static final int TEST_CHAT_THREADS = 3;
private static final long TEST_CHAT_RETRIES = 9L;
private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
@@ -2708,8 +2707,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
).andReturn(Futures.immediateFailedFuture(new
RuntimeException())).times(2);
taskQueue.shutdown(
EasyMock.contains("sequenceName-0"),
- EasyMock.eq("Task [%s] failed to respond to [set end offsets] in a
timely manner, killing task"),
- EasyMock.contains("sequenceName-0")
+ EasyMock.eq("Failed to set end offsets, killing task")
);
EasyMock.expectLastCall().times(2);
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 303eb5eff38..d907a372d7e 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -130,7 +130,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
false
);
private static final String DATASOURCE = "testDS";
- private static final int TEST_CHAT_THREADS = 3;
private static final long TEST_CHAT_RETRIES = 9L;
private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
@@ -1094,7 +1093,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
- taskQueue.shutdown("id4", "Task [%s] failed to return status, killing
task", "id4");
+ taskQueue.shutdown("id4", "Task[%s] failed to return status, killing
task", "id4");
replayAll();
supervisor.start();
@@ -2386,8 +2385,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
).andReturn(Futures.immediateFailedFuture(new
RuntimeException())).times(2);
taskQueue.shutdown(
EasyMock.contains("sequenceName-0"),
- EasyMock.eq("Task [%s] failed to respond to [set end offsets] in a
timely manner, killing task"),
- EasyMock.contains("sequenceName-0")
+ EasyMock.eq("Failed to set end offsets, killing task")
);
EasyMock.expectLastCall().times(2);
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 906f7155665..37596bffde9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -163,24 +163,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private static final EmittingLogger log = new
EmittingLogger(SeekableStreamSupervisor.class);
- private static final Comparator<ParseExceptionReport>
PARSE_EXCEPTION_REPORT_COMPARATOR =
- new Comparator<ParseExceptionReport>()
- {
- @Override
- public int compare(ParseExceptionReport o1, ParseExceptionReport o2)
- {
- int timeCompare = Long.compare(o1.getTimeOfExceptionMillis(),
o2.getTimeOfExceptionMillis());
- if (timeCompare != 0) {
- return timeCompare;
- }
- int errorTypeCompare =
StringComparators.LEXICOGRAPHIC.compare(o1.getErrorType(), o2.getErrorType());
- if (errorTypeCompare != 0) {
- return errorTypeCompare;
- }
-
- return StringComparators.LEXICOGRAPHIC.compare(o1.getInput(),
o2.getInput());
- }
- };
+ private static final Comparator<ParseExceptionReport>
PARSE_EXCEPTION_REPORT_COMPARATOR
+ =
Comparator.comparingLong(ParseExceptionReport::getTimeOfExceptionMillis)
+ .thenComparing(ParseExceptionReport::getErrorType,
StringComparators.LEXICOGRAPHIC)
+ .thenComparing(ParseExceptionReport::getInput,
StringComparators.LEXICOGRAPHIC);
// Internal data structures
// --------------------------------------------------------
@@ -1890,7 +1876,6 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
-
private void killTask(final String id, String reasonFormat, Object... args)
{
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
@@ -1978,7 +1963,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (!inactivePartitionsInTask.isEmpty()) {
killTaskWithSuccess(
taskId,
- "Task [%s] with partition set [%s] has inactive partitions [%s],
stopping task.",
+ "Task[%s] with partition set[%s] has inactive partitions[%s],
stopping task.",
taskId,
taskPartitions,
inactivePartitionsInTask
@@ -2046,9 +2031,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
SequenceOffsetType sequence = entry.getValue();
if (sequence.equals(getEndOfPartitionMarker())) {
log.info(
- "Got end of partition marker for partition
[%s] from task [%s] in discoverTasks, clearing partition offset to refetch from
metadata..",
- taskId,
- partition
+ "Got end-of-partition(EOS) marker for
partition[%s] from task[%s] in discoverTasks, clearing partition offset to
refetch from metadata.",
+ partition, taskId
);
endOffsetsAreInvalid = true;
partitionOffsets.put(partition,
getNotSetMarker());
@@ -2080,7 +2064,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.keySet()) {
if
(!taskGroupId.equals(getTaskGroupIdForPartition(partition))) {
log.warn(
- "Stopping task [%s] which does not match the
expected partition allocation",
+ "Stopping task[%s] as it does not match the
current partition allocation.",
taskId
);
@@ -2091,10 +2075,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// make sure the task's io and tuning configs match
with the supervisor config
// if it is current then only create corresponding
taskGroup if it does not exist
if (!isTaskCurrent(taskGroupId, taskId,
activeTaskMap)) {
- log.info(
- "Stopping task [%s] which does not match the
expected parameters and ingestion spec",
- taskId
- );
+ log.info("Stopping task[%s] as it does not match
the current supervisor spec.", taskId);
// Returning false triggers a call to stopTask.
return false;
@@ -2127,8 +2108,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (prevTaskData != null) {
throw new ISE(
"taskGroup[%s] already exists for new
task[%s]",
- prevTaskData,
- taskId
+ prevTaskData, taskId
);
}
verifySameSequenceNameForAllTasksInGroup(taskGroupId);
@@ -2138,7 +2118,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
catch (Throwable t) {
stateManager.recordThrowableEvent(t);
- log.error(t, "Something bad while discovering task
[%s]", taskId);
+ log.error(t, "An error occurred while discovering
task[%s]", taskId);
return null;
}
}
@@ -2155,13 +2135,13 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
for (int i = 0; i < results.size(); i++) {
String taskId = futureTaskIds.get(i);
if (results.get(i).isError() || results.get(i).valueOrThrow() == null) {
- killTask(taskId, "Task [%s] failed to return status, killing task",
taskId);
+ killTask(taskId, "Task[%s] failed to return status, killing task",
taskId);
} else if (Boolean.valueOf(false).equals(results.get(i).valueOrThrow()))
{
// "return false" above means that we want to stop the task.
stopFutures.add(stopTask(taskId, false));
}
}
- log.debug("Found [%d] seekablestream indexing tasks for dataSource [%s]",
taskCount, dataSource);
+ log.debug("Found [%d] seekablestream indexing tasks for datasource[%s]",
taskCount, dataSource);
if (!stopFutures.isEmpty()) {
coalesceAndAwait(stopFutures);
@@ -2237,34 +2217,25 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ final String killMsg =
+ "Killing forcefully as task could not be resumed in the first
supervisor run after Overlord change.";
for (Map.Entry<String, ListenableFuture<Boolean>> entry :
tasksToResume.entrySet()) {
String taskId = entry.getKey();
ListenableFuture<Boolean> future = entry.getValue();
future.addListener(
- new Runnable()
- {
- @Override
- public void run()
- {
- try {
- if (entry.getValue().get()) {
- log.info("Resumed task [%s] in first supervisor run.",
taskId);
- } else {
- log.warn("Failed to resume task [%s] in first supervisor
run.", taskId);
- killTask(
- taskId,
- "Killing forcefully as task could not be resumed in the
first supervisor run after Overlord change."
- );
- }
- }
- catch (Exception e) {
- log.warn(e, "Failed to resume task [%s] in first supervisor
run.", taskId);
- killTask(
- taskId,
- "Killing forcefully as task could not be resumed in the
first supervisor run after Overlord change."
- );
+ () -> {
+ try {
+ if (entry.getValue().get()) {
+ log.info("Resumed task[%s] in first supervisor run.", taskId);
+ } else {
+ log.warn("Failed to resume task[%s] in first supervisor run.",
taskId);
+ killTask(taskId, killMsg);
}
}
+ catch (Exception e) {
+ log.warn(e, "Failed to resume task[%s] in first supervisor
run.", taskId);
+ killTask(taskId, killMsg);
+ }
},
workerExec
);
@@ -3190,7 +3161,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
for (Entry<PartitionIdType, SequenceOffsetType> entry :
endOffsets.entrySet()) {
if (entry.getValue().equals(getEndOfPartitionMarker())) {
log.info(
- "Got end of partition marker for partition [%s] in
checkTaskDuration, not updating partition offset.",
+ "Got end-of-partition(EOS) marker for partition[%s] in
checkTaskDuration, not updating partition offset.",
entry.getKey()
);
endOffsetsAreInvalid = true;
@@ -3213,7 +3184,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
for (String id : group.taskIds()) {
killTask(
id,
- "All tasks in group [%s] failed to transition to publishing
state",
+ "All tasks in group[%s] failed to transition to publishing
state",
groupId
);
}
@@ -3343,9 +3314,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
log.info(
- "Setting endOffsets for tasks in taskGroup [%d] to %s",
- taskGroup.groupId,
- endOffsets
+ "Setting endOffsets for tasks in taskGroup[%d] to [%s]",
+ taskGroup.groupId, endOffsets
);
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId,
endOffsets, finalize));
@@ -3357,22 +3327,18 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
log.info("Successfully set endOffsets for task[%s] and
resumed it", setEndOffsetTaskIds.get(i));
} else {
String taskId = setEndOffsetTaskIds.get(i);
- killTask(
- taskId,
- "Task [%s] failed to respond to [set end offsets] in a
timely manner, killing task",
- taskId
- );
+ killTask(taskId, "Failed to set end offsets, killing task");
taskGroup.tasks.remove(taskId);
}
}
}
catch (Exception e) {
- log.error("Something bad happened [%s]", e.getMessage());
+ log.error("An exception occurred while setting end offsets:
[%s]", e.getMessage());
throw new RuntimeException(e);
}
if (taskGroup.tasks.isEmpty()) {
- log.info("All tasks in taskGroup [%d] have failed, tasks will be
re-created", taskGroup.groupId);
+ log.info("All tasks in taskGroup[%d] have failed, tasks will be
re-created", taskGroup.groupId);
return null;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 23ff4038633..cb653c6aa84 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -29,7 +29,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -78,8 +77,8 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
-import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -88,7 +87,6 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.hamcrest.MatcherAssert;
@@ -104,7 +102,6 @@ import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -117,7 +114,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
public class SeekableStreamSupervisorStateTest extends EasyMockSupport
{
@@ -142,7 +138,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
private RowIngestionMetersFactory rowIngestionMetersFactory;
private SupervisorStateManagerConfig supervisorConfig;
- private TestEmitter emitter;
+ private StubServiceEmitter emitter;
@Before
public void setupTest()
@@ -161,7 +157,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
supervisorConfig = new SupervisorStateManagerConfig();
- emitter = new TestEmitter();
+ emitter = new StubServiceEmitter("test-supervisor-state", "localhost");
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
@@ -839,7 +835,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
Map<String, Object> context = new HashMap<>();
context.put("checkpoints", new
ObjectMapper().writeValueAsString(sequenceOffsets));
- SeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
+ TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
"id1",
null,
getDataSchema(),
@@ -849,7 +845,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
"0"
);
- SeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
+ TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
"id2",
null,
getDataSchema(),
@@ -859,31 +855,52 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
"0"
);
+ TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
+ "id3",
+ null,
+ getDataSchema(),
+ taskTuningConfig,
+ taskIoConfig,
+ context,
+ "0"
+ );
+
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
+ final TaskLocation location3 = TaskLocation.create("testHost3", 145, -1);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+ workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
- .andReturn(ImmutableList.of(id1, id2))
+ .andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.reset(indexerMetadataStorageCoordinator);
- EasyMock.expect(
-
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new
TestSeekableStreamDataSourceMetadata(null)
- ).anyTimes();
-
EasyMock.expect(indexTaskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
-
EasyMock.expect(indexTaskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+ .andReturn(new
TestSeekableStreamDataSourceMetadata(null)).anyTimes();
+ EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
+
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.getStatusAsync("id2"))
+
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.getStatusAsync("id3"))
+
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PAUSED))
+ .anyTimes();
EasyMock.expect(indexTaskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
EasyMock.expect(indexTaskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
+
EasyMock.expect(indexTaskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
@@ -895,24 +912,36 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
+
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id3"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id1"))
.andReturn(Futures.immediateFuture(partitionOffset))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture(partitionOffset))
.anyTimes();
+ EasyMock.expect(indexTaskClient.pauseAsync("id3"))
+ .andReturn(Futures.immediateFuture(partitionOffset))
+ .anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset,
false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset,
false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
+ EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id3", partitionOffset,
false))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id1"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id2"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
+ EasyMock.expect(indexTaskClient.resumeAsync("id3"))
+ .andReturn(Futures.immediateFuture(false))
+ .anyTimes();
EasyMock.expect(indexTaskClient.stopAsync("id1", false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
@@ -920,6 +949,13 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
.andReturn(Futures.immediateFuture(true))
.anyTimes();
+ taskQueue.shutdown(
+ "id3",
+ "Killing forcefully as task could not be resumed in the"
+ + " first supervisor run after Overlord change."
+ );
+ EasyMock.expectLastCall().atLeastOnce();
+
replayAll();
SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
@@ -967,29 +1003,14 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
latch.await();
- List<Event> events = emitter.getEvents();
- List<String> whitelist = Arrays.asList("ingest/test/lag",
"ingest/test/maxLag",
- "ingest/test/avgLag", "ingest/test/lag/time",
"ingest/test/maxLag/time", "ingest/test/avgLag/time");
- events = filterMetrics(events, whitelist);
- Assert.assertEquals(6, events.size());
- Assert.assertEquals("ingest/test/lag",
events.get(0).toMap().get("metric"));
- Assert.assertEquals(850L, events.get(0).toMap().get("value"));
- Assert.assertEquals(METRIC_TAGS,
events.get(0).toMap().get(DruidMetrics.TAGS));
- Assert.assertEquals("ingest/test/maxLag",
events.get(1).toMap().get("metric"));
- Assert.assertEquals(500L, events.get(1).toMap().get("value"));
- Assert.assertEquals(METRIC_TAGS,
events.get(1).toMap().get(DruidMetrics.TAGS));
- Assert.assertEquals("ingest/test/avgLag",
events.get(2).toMap().get("metric"));
- Assert.assertEquals(283L, events.get(2).toMap().get("value"));
- Assert.assertEquals(METRIC_TAGS,
events.get(2).toMap().get(DruidMetrics.TAGS));
- Assert.assertEquals("ingest/test/lag/time",
events.get(3).toMap().get("metric"));
- Assert.assertEquals(45000L, events.get(3).toMap().get("value"));
- Assert.assertEquals(METRIC_TAGS,
events.get(3).toMap().get(DruidMetrics.TAGS));
- Assert.assertEquals("ingest/test/maxLag/time",
events.get(4).toMap().get("metric"));
- Assert.assertEquals(20000L, events.get(4).toMap().get("value"));
- Assert.assertEquals(METRIC_TAGS,
events.get(4).toMap().get(DruidMetrics.TAGS));
- Assert.assertEquals("ingest/test/avgLag/time",
events.get(5).toMap().get("metric"));
- Assert.assertEquals(15000L, events.get(5).toMap().get("value"));
- Assert.assertEquals(METRIC_TAGS,
events.get(5).toMap().get(DruidMetrics.TAGS));
+
+ final Map<String, Object> dimFilters = ImmutableMap.of(DruidMetrics.TAGS,
METRIC_TAGS);
+ emitter.verifyValue("ingest/test/lag", dimFilters, 850L);
+ emitter.verifyValue("ingest/test/maxLag", dimFilters, 500L);
+ emitter.verifyValue("ingest/test/avgLag", dimFilters, 283L);
+ emitter.verifyValue("ingest/test/lag/time", dimFilters, 45000L);
+ emitter.verifyValue("ingest/test/maxLag/time", dimFilters, 20000L);
+ emitter.verifyValue("ingest/test/avgLag/time", dimFilters, 15000L);
verifyAll();
}
@@ -1016,16 +1037,11 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
latch.await();
- List<Event> events = emitter.getEvents();
- List<String> whitelist = Arrays.asList("ingest/test/lag",
"ingest/test/maxLag", "ingest/test/avgLag");
- events = filterMetrics(events, whitelist);
- Assert.assertEquals(3, events.size());
- Assert.assertEquals("ingest/test/lag",
events.get(0).toMap().get("metric"));
- Assert.assertEquals(850L, events.get(0).toMap().get("value"));
- Assert.assertEquals("ingest/test/maxLag",
events.get(1).toMap().get("metric"));
- Assert.assertEquals(500L, events.get(1).toMap().get("value"));
- Assert.assertEquals("ingest/test/avgLag",
events.get(2).toMap().get("metric"));
- Assert.assertEquals(283L, events.get(2).toMap().get("value"));
+
+ final Map<String, Object> dimFilters = ImmutableMap.of(DruidMetrics.TAGS,
METRIC_TAGS);
+ emitter.verifyValue("ingest/test/lag", dimFilters, 850L);
+ emitter.verifyValue("ingest/test/maxLag", dimFilters, 500L);
+ emitter.verifyValue("ingest/test/avgLag", dimFilters, 283L);
verifyAll();
}
@@ -1053,19 +1069,12 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
latch.await();
- List<Event> events = emitter.getEvents();
- List<String> whitelist = Arrays.asList("ingest/test/lag/time",
"ingest/test/maxLag/time", "ingest/test/avgLag/time");
- events = filterMetrics(events, whitelist);
- Assert.assertEquals(3, events.size());
- Assert.assertEquals("ingest/test/lag/time",
events.get(0).toMap().get("metric"));
- Assert.assertEquals(45000L, events.get(0).toMap().get("value"));
- Assert.assertEquals(METRIC_TAGS,
events.get(0).toMap().get(DruidMetrics.TAGS));
- Assert.assertEquals("ingest/test/maxLag/time",
events.get(1).toMap().get("metric"));
- Assert.assertEquals(20000L, events.get(1).toMap().get("value"));
- Assert.assertEquals(METRIC_TAGS,
events.get(1).toMap().get(DruidMetrics.TAGS));
- Assert.assertEquals("ingest/test/avgLag/time",
events.get(2).toMap().get("metric"));
- Assert.assertEquals(15000L, events.get(2).toMap().get("value"));
- Assert.assertEquals(METRIC_TAGS,
events.get(2).toMap().get(DruidMetrics.TAGS));
+
+ final Map<String, Object> dimFilters = ImmutableMap.of(DruidMetrics.TAGS,
METRIC_TAGS);
+ emitter.verifyValue("ingest/test/lag/time", dimFilters, 45000L);
+ emitter.verifyValue("ingest/test/maxLag/time", dimFilters, 20000L);
+ emitter.verifyValue("ingest/test/avgLag/time", dimFilters, 15000L);
+
verifyAll();
}
@@ -1093,14 +1102,13 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
latch.await();
- List<Event> events = emitter.getEvents();
- List<String> whitelist =
Collections.singletonList("ingest/notices/queueSize");
- events = filterMetrics(events, whitelist);
- Assert.assertEquals(1, events.size());
- Assert.assertEquals("ingest/notices/queueSize",
events.get(0).toMap().get("metric"));
- Assert.assertEquals(0, events.get(0).toMap().get("value"));
- Assert.assertEquals(METRIC_TAGS,
events.get(0).toMap().get(DruidMetrics.TAGS));
- Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
+
+ final Map<String, Object> dimFilters = ImmutableMap.of(
+ DruidMetrics.TAGS, METRIC_TAGS,
+ DruidMetrics.DATASOURCE, "testDS"
+ );
+ emitter.verifyValue("ingest/notices/queueSize", dimFilters, 0);
+
verifyAll();
}
@@ -1122,15 +1130,15 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
latch.await();
- List<Event> events = emitter.getEvents();
- List<String> whitelist = Collections.singletonList("ingest/notices/time");
- events = filterMetrics(events, whitelist);
- Assert.assertEquals(1, events.size());
- Assert.assertEquals("ingest/notices/time",
events.get(0).toMap().get("metric"));
- Assert.assertEquals(METRIC_TAGS,
events.get(0).toMap().get(DruidMetrics.TAGS));
- Assert.assertTrue(String.valueOf(events.get(0).toMap().get("value")),
(long) events.get(0).toMap().get("value") > 0);
- Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
- Assert.assertEquals("run_notice", events.get(0).toMap().get("noticeType"));
+
+ final Map<String, Object> dimFilters = ImmutableMap.of(
+ DruidMetrics.TAGS, METRIC_TAGS,
+ DruidMetrics.DATASOURCE, "testDS",
+ "noticeType", "run_notice"
+ );
+ long observedNoticeTime = emitter.getValue("ingest/notices/time",
dimFilters).longValue();
+ Assert.assertTrue(observedNoticeTime > 0);
+
verifyAll();
}
@@ -1158,11 +1166,14 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
latch.await();
- List<Event> events = emitter.getEvents();
- List<String> whitelist = Arrays.asList("ingest/test/lag",
"ingest/test/maxLag",
- "ingest/test/avgLag", "ingest/test/lag/time",
"ingest/test/maxLag/time", "ingest/test/avgLag/time");
- events = filterMetrics(events, whitelist);
- Assert.assertEquals(0, events.size());
+
+ emitter.verifyNotEmitted("ingest/test/lag");
+ emitter.verifyNotEmitted("ingest/test/maxLag");
+ emitter.verifyNotEmitted("ingest/test/avgLag");
+ emitter.verifyNotEmitted("ingest/test/lag/time");
+ emitter.verifyNotEmitted("ingest/test/maxLag/time");
+ emitter.verifyNotEmitted("ingest/test/avgLag/time");
+
verifyAll();
}
@@ -1838,14 +1849,6 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
EasyMock.verify(executorService, spec);
}
- private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
- {
- List<Event> result = events.stream()
- .filter(e -> whitelist.contains(e.toMap().get("metric").toString()))
- .collect(Collectors.toList());
- return result;
- }
-
private void expectEmitterSupervisor(boolean suspended) throws
EntryExistsException
{
spec = createMock(SeekableStreamSupervisorSpec.class);
@@ -2401,27 +2404,6 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}
}
- private static class TestEmitter extends NoopServiceEmitter
- {
- @GuardedBy("events")
- private final List<Event> events = new ArrayList<>();
-
- @Override
- public void emit(Event event)
- {
- synchronized (events) {
- events.add(event);
- }
- }
-
- public List<Event> getEvents()
- {
- synchronized (events) {
- return ImmutableList.copyOf(events);
- }
- }
- }
-
private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final String taskType;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]