This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 67c7b6248cf Fix log typos, clean up some kill messages in 
SeekableStreamSupervisor (#15424)
67c7b6248cf is described below

commit 67c7b6248cfead7b9ab6950e13cc33583dbf8b88
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Nov 24 16:09:10 2023 +0530

    Fix log typos, clean up some kill messages in SeekableStreamSupervisor 
(#15424)
    
    Changes:
    - Fix log `Got end of partition marker for partition [%s] from task [%s] in 
discoverTasks`
    by fixing order of args
    - Simplify in-line classes by using lambda
    - Update kill task message from `Task [%s] failed to respond to [set end 
offsets]
     in a timely manner, killing task` to `Failed to set end offsets, killing 
task`
    - Clean up tests
---
 .../kafka/supervisor/KafkaSupervisorTest.java      |   4 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   6 +-
 .../supervisor/SeekableStreamSupervisor.java       | 100 ++++------
 .../SeekableStreamSupervisorStateTest.java         | 206 ++++++++++-----------
 4 files changed, 130 insertions(+), 186 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 3087bdcbb1a..b221cdf418c 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -145,7 +145,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
   private static final String TOPIC_PREFIX = "testTopic";
   private static final String DATASOURCE = "testDS";
   private static final int NUM_PARTITIONS = 3;
-  private static final int TEST_CHAT_THREADS = 3;
   private static final long TEST_CHAT_RETRIES = 9L;
   private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
   private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
@@ -2708,8 +2707,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     ).andReturn(Futures.immediateFailedFuture(new 
RuntimeException())).times(2);
     taskQueue.shutdown(
         EasyMock.contains("sequenceName-0"),
-        EasyMock.eq("Task [%s] failed to respond to [set end offsets] in a 
timely manner, killing task"),
-        EasyMock.contains("sequenceName-0")
+        EasyMock.eq("Failed to set end offsets, killing task")
     );
     EasyMock.expectLastCall().times(2);
     
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 303eb5eff38..d907a372d7e 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -130,7 +130,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
       false
   );
   private static final String DATASOURCE = "testDS";
-  private static final int TEST_CHAT_THREADS = 3;
   private static final long TEST_CHAT_RETRIES = 9L;
   private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
   private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
@@ -1094,7 +1093,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
 
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
-    taskQueue.shutdown("id4", "Task [%s] failed to return status, killing 
task", "id4");
+    taskQueue.shutdown("id4", "Task[%s] failed to return status, killing 
task", "id4");
     replayAll();
 
     supervisor.start();
@@ -2386,8 +2385,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     ).andReturn(Futures.immediateFailedFuture(new 
RuntimeException())).times(2);
     taskQueue.shutdown(
         EasyMock.contains("sequenceName-0"),
-        EasyMock.eq("Task [%s] failed to respond to [set end offsets] in a 
timely manner, killing task"),
-        EasyMock.contains("sequenceName-0")
+        EasyMock.eq("Failed to set end offsets, killing task")
     );
     EasyMock.expectLastCall().times(2);
     
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 906f7155665..37596bffde9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -163,24 +163,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   private static final EmittingLogger log = new 
EmittingLogger(SeekableStreamSupervisor.class);
 
-  private static final Comparator<ParseExceptionReport> 
PARSE_EXCEPTION_REPORT_COMPARATOR =
-      new Comparator<ParseExceptionReport>()
-      {
-        @Override
-        public int compare(ParseExceptionReport o1, ParseExceptionReport o2)
-        {
-          int timeCompare = Long.compare(o1.getTimeOfExceptionMillis(), 
o2.getTimeOfExceptionMillis());
-          if (timeCompare != 0) {
-            return timeCompare;
-          }
-          int errorTypeCompare = 
StringComparators.LEXICOGRAPHIC.compare(o1.getErrorType(), o2.getErrorType());
-          if (errorTypeCompare != 0) {
-            return errorTypeCompare;
-          }
-
-          return StringComparators.LEXICOGRAPHIC.compare(o1.getInput(), 
o2.getInput());
-        }
-      };
+  private static final Comparator<ParseExceptionReport> 
PARSE_EXCEPTION_REPORT_COMPARATOR
+      = 
Comparator.comparingLong(ParseExceptionReport::getTimeOfExceptionMillis)
+                  .thenComparing(ParseExceptionReport::getErrorType, 
StringComparators.LEXICOGRAPHIC)
+                  .thenComparing(ParseExceptionReport::getInput, 
StringComparators.LEXICOGRAPHIC);
 
   // Internal data structures
   // --------------------------------------------------------
@@ -1890,7 +1876,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   }
 
-
   private void killTask(final String id, String reasonFormat, Object... args)
   {
     Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
@@ -1978,7 +1963,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         if (!inactivePartitionsInTask.isEmpty()) {
           killTaskWithSuccess(
               taskId,
-              "Task [%s] with partition set [%s] has inactive partitions [%s], 
stopping task.",
+              "Task[%s] with partition set[%s] has inactive partitions[%s], 
stopping task.",
               taskId,
               taskPartitions,
               inactivePartitionsInTask
@@ -2046,9 +2031,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                             SequenceOffsetType sequence = entry.getValue();
                             if (sequence.equals(getEndOfPartitionMarker())) {
                               log.info(
-                                  "Got end of partition marker for partition 
[%s] from task [%s] in discoverTasks, clearing partition offset to refetch from 
metadata..",
-                                  taskId,
-                                  partition
+                                  "Got end-of-partition(EOS) marker for 
partition[%s] from task[%s] in discoverTasks, clearing partition offset to 
refetch from metadata.",
+                                  partition, taskId
                               );
                               endOffsetsAreInvalid = true;
                               partitionOffsets.put(partition, 
getNotSetMarker());
@@ -2080,7 +2064,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                                                                
   .keySet()) {
                             if 
(!taskGroupId.equals(getTaskGroupIdForPartition(partition))) {
                               log.warn(
-                                  "Stopping task [%s] which does not match the 
expected partition allocation",
+                                  "Stopping task[%s] as it does not match the 
current partition allocation.",
                                   taskId
                               );
 
@@ -2091,10 +2075,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                           // make sure the task's io and tuning configs match 
with the supervisor config
                           // if it is current then only create corresponding 
taskGroup if it does not exist
                           if (!isTaskCurrent(taskGroupId, taskId, 
activeTaskMap)) {
-                            log.info(
-                                "Stopping task [%s] which does not match the 
expected parameters and ingestion spec",
-                                taskId
-                            );
+                            log.info("Stopping task[%s] as it does not match 
the current supervisor spec.", taskId);
 
                             // Returning false triggers a call to stopTask.
                             return false;
@@ -2127,8 +2108,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                             if (prevTaskData != null) {
                               throw new ISE(
                                   "taskGroup[%s] already exists for new 
task[%s]",
-                                  prevTaskData,
-                                  taskId
+                                  prevTaskData, taskId
                               );
                             }
                             
verifySameSequenceNameForAllTasksInGroup(taskGroupId);
@@ -2138,7 +2118,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                       }
                       catch (Throwable t) {
                         stateManager.recordThrowableEvent(t);
-                        log.error(t, "Something bad while discovering task 
[%s]", taskId);
+                        log.error(t, "An error occurred while discovering 
task[%s]", taskId);
                         return null;
                       }
                     }
@@ -2155,13 +2135,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     for (int i = 0; i < results.size(); i++) {
       String taskId = futureTaskIds.get(i);
       if (results.get(i).isError() || results.get(i).valueOrThrow() == null) {
-        killTask(taskId, "Task [%s] failed to return status, killing task", 
taskId);
+        killTask(taskId, "Task[%s] failed to return status, killing task", 
taskId);
       } else if (Boolean.valueOf(false).equals(results.get(i).valueOrThrow())) 
{
         // "return false" above means that we want to stop the task.
         stopFutures.add(stopTask(taskId, false));
       }
     }
-    log.debug("Found [%d] seekablestream indexing tasks for dataSource [%s]", 
taskCount, dataSource);
+    log.debug("Found [%d] seekablestream indexing tasks for datasource[%s]", 
taskCount, dataSource);
 
     if (!stopFutures.isEmpty()) {
       coalesceAndAwait(stopFutures);
@@ -2237,34 +2217,25 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       }
     }
 
+    final String killMsg =
+        "Killing forcefully as task could not be resumed in the first 
supervisor run after Overlord change.";
     for (Map.Entry<String, ListenableFuture<Boolean>> entry : 
tasksToResume.entrySet()) {
       String taskId = entry.getKey();
       ListenableFuture<Boolean> future = entry.getValue();
       future.addListener(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              try {
-                if (entry.getValue().get()) {
-                  log.info("Resumed task [%s] in first supervisor run.", 
taskId);
-                } else {
-                  log.warn("Failed to resume task [%s] in first supervisor 
run.", taskId);
-                  killTask(
-                      taskId,
-                      "Killing forcefully as task could not be resumed in the 
first supervisor run after Overlord change."
-                  );
-                }
-              }
-              catch (Exception e) {
-                log.warn(e, "Failed to resume task [%s] in first supervisor 
run.", taskId);
-                killTask(
-                    taskId,
-                    "Killing forcefully as task could not be resumed in the 
first supervisor run after Overlord change."
-                );
+          () -> {
+            try {
+              if (entry.getValue().get()) {
+                log.info("Resumed task[%s] in first supervisor run.", taskId);
+              } else {
+                log.warn("Failed to resume task[%s] in first supervisor run.", 
taskId);
+                killTask(taskId, killMsg);
               }
             }
+            catch (Exception e) {
+              log.warn(e, "Failed to resume task[%s] in first supervisor 
run.", taskId);
+              killTask(taskId, killMsg);
+            }
           },
           workerExec
       );
@@ -3190,7 +3161,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         for (Entry<PartitionIdType, SequenceOffsetType> entry : 
endOffsets.entrySet()) {
           if (entry.getValue().equals(getEndOfPartitionMarker())) {
             log.info(
-                "Got end of partition marker for partition [%s] in 
checkTaskDuration, not updating partition offset.",
+                "Got end-of-partition(EOS) marker for partition[%s] in 
checkTaskDuration, not updating partition offset.",
                 entry.getKey()
             );
             endOffsetsAreInvalid = true;
@@ -3213,7 +3184,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         for (String id : group.taskIds()) {
           killTask(
               id,
-              "All tasks in group [%s] failed to transition to publishing 
state",
+              "All tasks in group[%s] failed to transition to publishing 
state",
               groupId
           );
         }
@@ -3343,9 +3314,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
               }
 
               log.info(
-                  "Setting endOffsets for tasks in taskGroup [%d] to %s",
-                  taskGroup.groupId,
-                  endOffsets
+                  "Setting endOffsets for tasks in taskGroup[%d] to [%s]",
+                  taskGroup.groupId, endOffsets
               );
               for (final String taskId : setEndOffsetTaskIds) {
                 setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, 
endOffsets, finalize));
@@ -3357,22 +3327,18 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                   log.info("Successfully set endOffsets for task[%s] and 
resumed it", setEndOffsetTaskIds.get(i));
                 } else {
                   String taskId = setEndOffsetTaskIds.get(i);
-                  killTask(
-                      taskId,
-                      "Task [%s] failed to respond to [set end offsets] in a 
timely manner, killing task",
-                      taskId
-                  );
+                  killTask(taskId, "Failed to set end offsets, killing task");
                   taskGroup.tasks.remove(taskId);
                 }
               }
             }
             catch (Exception e) {
-              log.error("Something bad happened [%s]", e.getMessage());
+              log.error("An exception occurred while setting end offsets: 
[%s]", e.getMessage());
               throw new RuntimeException(e);
             }
 
             if (taskGroup.tasks.isEmpty()) {
-              log.info("All tasks in taskGroup [%d] have failed, tasks will be 
re-created", taskGroup.groupId);
+              log.info("All tasks in taskGroup[%d] have failed, tasks will be 
re-created", taskGroup.groupId);
               return null;
             }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 23ff4038633..cb653c6aa84 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -29,7 +29,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.data.input.impl.ByteEntity;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -78,8 +77,8 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
-import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -88,7 +87,6 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.hamcrest.MatcherAssert;
@@ -104,7 +102,6 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -117,7 +114,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 public class SeekableStreamSupervisorStateTest extends EasyMockSupport
 {
@@ -142,7 +138,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   private RowIngestionMetersFactory rowIngestionMetersFactory;
   private SupervisorStateManagerConfig supervisorConfig;
 
-  private TestEmitter emitter;
+  private StubServiceEmitter emitter;
 
   @Before
   public void setupTest()
@@ -161,7 +157,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
     supervisorConfig = new SupervisorStateManagerConfig();
 
-    emitter = new TestEmitter();
+    emitter = new StubServiceEmitter("test-supervisor-state", "localhost");
 
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
@@ -839,7 +835,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     Map<String, Object> context = new HashMap<>();
     context.put("checkpoints", new 
ObjectMapper().writeValueAsString(sequenceOffsets));
 
-    SeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
+    TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
             "id1",
             null,
             getDataSchema(),
@@ -849,7 +845,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
             "0"
     );
 
-    SeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
+    TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
             "id2",
             null,
             getDataSchema(),
@@ -859,31 +855,52 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
             "0"
     );
 
+    TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
+        "id3",
+        null,
+        getDataSchema(),
+        taskTuningConfig,
+        taskIoConfig,
+        context,
+        "0"
+    );
+
     final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
     final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
+    final TaskLocation location3 = TaskLocation.create("testHost3", 145, -1);
 
     Collection workItems = new ArrayList<>();
     workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
     workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+    workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));
 
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
     EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
-            .andReturn(ImmutableList.of(id1, id2))
+            .andReturn(ImmutableList.of(id1, id2, id3))
             .anyTimes();
     
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
     
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
     
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
     
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id2)).anyTimes();
 
     EasyMock.reset(indexerMetadataStorageCoordinator);
-    EasyMock.expect(
-            
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new
 TestSeekableStreamDataSourceMetadata(null)
-    ).anyTimes();
-    
EasyMock.expect(indexTaskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
-    
EasyMock.expect(indexTaskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+            .andReturn(new 
TestSeekableStreamDataSourceMetadata(null)).anyTimes();
+    EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
+            
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.getStatusAsync("id2"))
+            
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.getStatusAsync("id3"))
+            
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PAUSED))
+            .anyTimes();
 
     
EasyMock.expect(indexTaskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
     
EasyMock.expect(indexTaskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
+    
EasyMock.expect(indexTaskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
 
     ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
     final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
@@ -895,24 +912,36 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
             .andReturn(Futures.immediateFuture(checkpoints))
             .anyTimes();
+    
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints))
+            .anyTimes();
     EasyMock.expect(indexTaskClient.pauseAsync("id1"))
             .andReturn(Futures.immediateFuture(partitionOffset))
             .anyTimes();
     EasyMock.expect(indexTaskClient.pauseAsync("id2"))
             .andReturn(Futures.immediateFuture(partitionOffset))
             .anyTimes();
+    EasyMock.expect(indexTaskClient.pauseAsync("id3"))
+            .andReturn(Futures.immediateFuture(partitionOffset))
+            .anyTimes();
     EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset, 
false))
             .andReturn(Futures.immediateFuture(true))
             .anyTimes();
     EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset, 
false))
             .andReturn(Futures.immediateFuture(true))
             .anyTimes();
+    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id3", partitionOffset, 
false))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
     EasyMock.expect(indexTaskClient.resumeAsync("id1"))
             .andReturn(Futures.immediateFuture(true))
             .anyTimes();
     EasyMock.expect(indexTaskClient.resumeAsync("id2"))
             .andReturn(Futures.immediateFuture(true))
             .anyTimes();
+    EasyMock.expect(indexTaskClient.resumeAsync("id3"))
+            .andReturn(Futures.immediateFuture(false))
+            .anyTimes();
     EasyMock.expect(indexTaskClient.stopAsync("id1", false))
             .andReturn(Futures.immediateFuture(true))
             .anyTimes();
@@ -920,6 +949,13 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
             .andReturn(Futures.immediateFuture(true))
             .anyTimes();
 
+    taskQueue.shutdown(
+        "id3",
+        "Killing forcefully as task could not be resumed in the"
+        + " first supervisor run after Overlord change."
+    );
+    EasyMock.expectLastCall().atLeastOnce();
+
     replayAll();
 
     SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
@@ -967,29 +1003,14 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
 
     latch.await();
-    List<Event> events = emitter.getEvents();
-    List<String> whitelist = Arrays.asList("ingest/test/lag", 
"ingest/test/maxLag",
-        "ingest/test/avgLag", "ingest/test/lag/time", 
"ingest/test/maxLag/time", "ingest/test/avgLag/time");
-    events = filterMetrics(events, whitelist);
-    Assert.assertEquals(6, events.size());
-    Assert.assertEquals("ingest/test/lag", 
events.get(0).toMap().get("metric"));
-    Assert.assertEquals(850L, events.get(0).toMap().get("value"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(0).toMap().get(DruidMetrics.TAGS));
-    Assert.assertEquals("ingest/test/maxLag", 
events.get(1).toMap().get("metric"));
-    Assert.assertEquals(500L, events.get(1).toMap().get("value"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(1).toMap().get(DruidMetrics.TAGS));
-    Assert.assertEquals("ingest/test/avgLag", 
events.get(2).toMap().get("metric"));
-    Assert.assertEquals(283L, events.get(2).toMap().get("value"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(2).toMap().get(DruidMetrics.TAGS));
-    Assert.assertEquals("ingest/test/lag/time", 
events.get(3).toMap().get("metric"));
-    Assert.assertEquals(45000L, events.get(3).toMap().get("value"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(3).toMap().get(DruidMetrics.TAGS));
-    Assert.assertEquals("ingest/test/maxLag/time", 
events.get(4).toMap().get("metric"));
-    Assert.assertEquals(20000L, events.get(4).toMap().get("value"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(4).toMap().get(DruidMetrics.TAGS));
-    Assert.assertEquals("ingest/test/avgLag/time", 
events.get(5).toMap().get("metric"));
-    Assert.assertEquals(15000L, events.get(5).toMap().get("value"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(5).toMap().get(DruidMetrics.TAGS));
+
+    final Map<String, Object> dimFilters = ImmutableMap.of(DruidMetrics.TAGS, 
METRIC_TAGS);
+    emitter.verifyValue("ingest/test/lag", dimFilters, 850L);
+    emitter.verifyValue("ingest/test/maxLag", dimFilters, 500L);
+    emitter.verifyValue("ingest/test/avgLag", dimFilters, 283L);
+    emitter.verifyValue("ingest/test/lag/time", dimFilters, 45000L);
+    emitter.verifyValue("ingest/test/maxLag/time", dimFilters, 20000L);
+    emitter.verifyValue("ingest/test/avgLag/time", dimFilters, 15000L);
     verifyAll();
   }
 
@@ -1016,16 +1037,11 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
 
     latch.await();
-    List<Event> events = emitter.getEvents();
-    List<String> whitelist = Arrays.asList("ingest/test/lag", 
"ingest/test/maxLag", "ingest/test/avgLag");
-    events = filterMetrics(events, whitelist);
-    Assert.assertEquals(3, events.size());
-    Assert.assertEquals("ingest/test/lag", 
events.get(0).toMap().get("metric"));
-    Assert.assertEquals(850L, events.get(0).toMap().get("value"));
-    Assert.assertEquals("ingest/test/maxLag", 
events.get(1).toMap().get("metric"));
-    Assert.assertEquals(500L, events.get(1).toMap().get("value"));
-    Assert.assertEquals("ingest/test/avgLag", 
events.get(2).toMap().get("metric"));
-    Assert.assertEquals(283L, events.get(2).toMap().get("value"));
+
+    final Map<String, Object> dimFilters = ImmutableMap.of(DruidMetrics.TAGS, 
METRIC_TAGS);
+    emitter.verifyValue("ingest/test/lag", dimFilters, 850L);
+    emitter.verifyValue("ingest/test/maxLag", dimFilters, 500L);
+    emitter.verifyValue("ingest/test/avgLag", dimFilters, 283L);
     verifyAll();
   }
 
@@ -1053,19 +1069,12 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
 
     latch.await();
-    List<Event> events = emitter.getEvents();
-    List<String> whitelist = Arrays.asList("ingest/test/lag/time", 
"ingest/test/maxLag/time", "ingest/test/avgLag/time");
-    events = filterMetrics(events, whitelist);
-    Assert.assertEquals(3, events.size());
-    Assert.assertEquals("ingest/test/lag/time", 
events.get(0).toMap().get("metric"));
-    Assert.assertEquals(45000L, events.get(0).toMap().get("value"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(0).toMap().get(DruidMetrics.TAGS));
-    Assert.assertEquals("ingest/test/maxLag/time", 
events.get(1).toMap().get("metric"));
-    Assert.assertEquals(20000L, events.get(1).toMap().get("value"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(1).toMap().get(DruidMetrics.TAGS));
-    Assert.assertEquals("ingest/test/avgLag/time", 
events.get(2).toMap().get("metric"));
-    Assert.assertEquals(15000L, events.get(2).toMap().get("value"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(2).toMap().get(DruidMetrics.TAGS));
+
+    final Map<String, Object> dimFilters = ImmutableMap.of(DruidMetrics.TAGS, 
METRIC_TAGS);
+    emitter.verifyValue("ingest/test/lag/time", dimFilters, 45000L);
+    emitter.verifyValue("ingest/test/maxLag/time", dimFilters, 20000L);
+    emitter.verifyValue("ingest/test/avgLag/time", dimFilters, 15000L);
+
     verifyAll();
   }
 
@@ -1093,14 +1102,13 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
 
     latch.await();
-    List<Event> events = emitter.getEvents();
-    List<String> whitelist = 
Collections.singletonList("ingest/notices/queueSize");
-    events = filterMetrics(events, whitelist);
-    Assert.assertEquals(1, events.size());
-    Assert.assertEquals("ingest/notices/queueSize", 
events.get(0).toMap().get("metric"));
-    Assert.assertEquals(0, events.get(0).toMap().get("value"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(0).toMap().get(DruidMetrics.TAGS));
-    Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
+
+    final Map<String, Object> dimFilters = ImmutableMap.of(
+        DruidMetrics.TAGS, METRIC_TAGS,
+        DruidMetrics.DATASOURCE, "testDS"
+    );
+    emitter.verifyValue("ingest/notices/queueSize", dimFilters, 0);
+
     verifyAll();
   }
 
@@ -1122,15 +1130,15 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
     Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
     latch.await();
-    List<Event> events = emitter.getEvents();
-    List<String> whitelist = Collections.singletonList("ingest/notices/time");
-    events = filterMetrics(events, whitelist);
-    Assert.assertEquals(1, events.size());
-    Assert.assertEquals("ingest/notices/time", 
events.get(0).toMap().get("metric"));
-    Assert.assertEquals(METRIC_TAGS, 
events.get(0).toMap().get(DruidMetrics.TAGS));
-    Assert.assertTrue(String.valueOf(events.get(0).toMap().get("value")), 
(long) events.get(0).toMap().get("value") > 0);
-    Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
-    Assert.assertEquals("run_notice", events.get(0).toMap().get("noticeType"));
+
+    final Map<String, Object> dimFilters = ImmutableMap.of(
+        DruidMetrics.TAGS, METRIC_TAGS,
+        DruidMetrics.DATASOURCE, "testDS",
+        "noticeType", "run_notice"
+    );
+    long observedNoticeTime = emitter.getValue("ingest/notices/time", 
dimFilters).longValue();
+    Assert.assertTrue(observedNoticeTime > 0);
+
     verifyAll();
   }
 
@@ -1158,11 +1166,14 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
 
     latch.await();
-    List<Event> events = emitter.getEvents();
-    List<String> whitelist = Arrays.asList("ingest/test/lag", 
"ingest/test/maxLag",
-        "ingest/test/avgLag", "ingest/test/lag/time", 
"ingest/test/maxLag/time", "ingest/test/avgLag/time");
-    events = filterMetrics(events, whitelist);
-    Assert.assertEquals(0, events.size());
+
+    emitter.verifyNotEmitted("ingest/test/lag");
+    emitter.verifyNotEmitted("ingest/test/maxLag");
+    emitter.verifyNotEmitted("ingest/test/avgLag");
+    emitter.verifyNotEmitted("ingest/test/lag/time");
+    emitter.verifyNotEmitted("ingest/test/maxLag/time");
+    emitter.verifyNotEmitted("ingest/test/avgLag/time");
+
     verifyAll();
   }
 
@@ -1838,14 +1849,6 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     EasyMock.verify(executorService, spec);
   }
 
-  private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
-  {
-    List<Event> result = events.stream()
-        .filter(e -> whitelist.contains(e.toMap().get("metric").toString()))
-        .collect(Collectors.toList());
-    return result;
-  }
-
   private void expectEmitterSupervisor(boolean suspended) throws 
EntryExistsException
   {
     spec = createMock(SeekableStreamSupervisorSpec.class);
@@ -2401,27 +2404,6 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     }
   }
 
-  private static class TestEmitter extends NoopServiceEmitter
-  {
-    @GuardedBy("events")
-    private final List<Event> events = new ArrayList<>();
-
-    @Override
-    public void emit(Event event)
-    {
-      synchronized (events) {
-        events.add(event);
-      }
-    }
-
-    public List<Event> getEvents()
-    {
-      synchronized (events) {
-        return ImmutableList.copyOf(events);
-      }
-    }
-  }
-
   private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
   {
     private final String taskType;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to