This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 67dafed0e6 Add Driver.failed() call in FragmentInstanceScheduler 
(#5593)
67dafed0e6 is described below

commit 67dafed0e6f1ae7f9bff2d01acd39ca659302719
Author: BaiJian <[email protected]>
AuthorDate: Wed Apr 20 17:24:05 2022 +0800

    Add Driver.failed() call in FragmentInstanceScheduler (#5593)
---
 .../schedule/FragmentInstanceAbortedException.java | 35 ++++++++++++
 .../db/mpp/schedule/FragmentInstanceScheduler.java | 13 ++++-
 .../mpp/schedule/FragmentInstanceTaskExecutor.java |  1 +
 .../schedule/FragmentInstanceTimeoutSentinel.java  |  1 +
 .../db/mpp/schedule/queue/L1PriorityQueue.java     | 37 +++++++-----
 .../db/mpp/schedule/queue/L2PriorityQueue.java     | 66 ++++++++++++++--------
 .../db/mpp/schedule/task/FragmentInstanceTask.java | 10 ++++
 .../mpp/schedule/task/FragmentInstanceTaskID.java  | 10 ++--
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  | 18 ++++++
 .../schedule/FragmentInstanceSchedulerTest.java    | 20 +++++++
 .../FragmentInstanceTimeoutSentinelTest.java       | 55 ++++++++++--------
 .../db/mpp/schedule/queue/L1PriorityQueueTest.java | 22 ++++++++
 .../db/mpp/schedule/queue/L2PriorityQueueTest.java | 27 +++++++++
 13 files changed, 249 insertions(+), 66 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
new file mode 100644
index 0000000000..20017340f6
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceAbortedException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.schedule;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.Driver;
+
+/** A common exception to pass to {@link Driver#failed(Throwable)} */
+public class FragmentInstanceAbortedException extends Exception {
+
+  public static final String BY_TIMEOUT = "timeout";
+  public static final String BY_FRAGMENT_ABORT_CALLED = "fragment abort 
called";
+  public static final String BY_QUERY_CASCADING_ABORTED = "query cascading 
aborted";
+  public static final String BY_ALREADY_BEING_CANCELLED = "already being 
cancelled";
+
+  public FragmentInstanceAbortedException(FragmentInstanceId id, String 
causeMsg) {
+    super(String.format("FragmentInstance %s is aborted by %s", id.toString(), 
causeMsg));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
index 875894f47f..4422e81ba8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java
@@ -78,9 +78,7 @@ public class FragmentInstanceScheduler implements 
IFragmentInstanceScheduler, IS
             new FragmentInstanceTask());
     this.timeoutQueue =
         new L1PriorityQueue<>(
-            MAX_CAPACITY,
-            new FragmentInstanceTask.SchedulePriorityComparator(),
-            new FragmentInstanceTask());
+            MAX_CAPACITY, new FragmentInstanceTask.TimeoutComparator(), new 
FragmentInstanceTask());
     this.queryMap = new ConcurrentHashMap<>();
     this.blockedTasks = Collections.synchronizedSet(new HashSet<>());
     this.scheduler = new Scheduler();
@@ -154,6 +152,7 @@ public class FragmentInstanceScheduler implements 
IFragmentInstanceScheduler, IS
       for (FragmentInstanceTask task : queryRelatedTasks) {
         task.lock();
         try {
+          
task.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED);
           clearFragmentInstanceTask(task);
         } finally {
           task.unlock();
@@ -170,6 +169,7 @@ public class FragmentInstanceScheduler implements 
IFragmentInstanceScheduler, IS
     }
     task.lock();
     try {
+      
task.setAbortCause(FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED);
       clearFragmentInstanceTask(task);
     } finally {
       task.unlock();
@@ -190,6 +190,12 @@ public class FragmentInstanceScheduler implements 
IFragmentInstanceScheduler, IS
     if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
       task.setStatus(FragmentInstanceTaskStatus.ABORTED);
     }
+    if (task.getAbortCause() != null) {
+      task.getFragmentInstance()
+          .failed(
+              new FragmentInstanceAbortedException(
+                  task.getFragmentInstance().getInfo(), task.getAbortCause()));
+    }
     if (task.getStatus() == FragmentInstanceTaskStatus.ABORTED) {
       blockManager.forceDeregisterFragmentInstance(
           new TFragmentInstanceId(
@@ -345,6 +351,7 @@ public class FragmentInstanceScheduler implements 
IFragmentInstanceScheduler, IS
           }
           otherTask.lock();
           try {
+            
otherTask.setAbortCause(FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED);
             clearFragmentInstanceTask(otherTask);
           } finally {
             otherTask.unlock();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
index ed704c4ca4..fd19b67ee0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
@@ -59,6 +59,7 @@ public class FragmentInstanceTaskExecutor extends 
AbstractExecutor {
     // long cost = System.nanoTime() - startTime;
     // If the future is cancelled, the task is in an error and should be 
thrown.
     if (future.isCancelled()) {
+      
task.setAbortCause(FragmentInstanceAbortedException.BY_ALREADY_BEING_CANCELLED);
       scheduler.toAborted(task);
       return;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
index c1327a3db7..e7aaaf4e47 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
@@ -50,6 +50,7 @@ public class FragmentInstanceTimeoutSentinel extends 
AbstractExecutor {
       // After this time, the task must be timeout.
       Thread.sleep(waitTime);
     }
+    task.setAbortCause(FragmentInstanceAbortedException.BY_TIMEOUT);
     scheduler.toAborted(task);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
index 997bbc9296..ccbf9144e2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
@@ -19,8 +19,10 @@
 package org.apache.iotdb.db.mpp.schedule.queue;
 
 import java.util.Comparator;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 /**
  * An efficient subclass of {@link IndexedBlockingQueue} with 1-level priority 
groups.
@@ -36,9 +38,8 @@ import java.util.TreeMap;
  */
 public class L1PriorityQueue<E extends IDIndexedAccessible> extends 
IndexedBlockingQueue<E> {
 
-  // Here we use a map not a set to act as a queue because we need to get the 
element reference
-  // after it was removed.
-  private final SortedMap<E, E> elements;
+  private final SortedSet<E> sortedElements; // Used for accessing in order
+  private final Map<ID, E> keyedElements; // Used for accessing randomly
 
   /**
    * Init the queue with max capacity and specified comparator.
@@ -51,41 +52,51 @@ public class L1PriorityQueue<E extends IDIndexedAccessible> 
extends IndexedBlock
    */
   public L1PriorityQueue(int maxCapacity, Comparator<E> comparator, E 
queryHolder) {
     super(maxCapacity, queryHolder);
-    this.elements = new TreeMap<>(comparator);
+    this.sortedElements = new TreeSet<>(comparator);
+    this.keyedElements = new HashMap<>();
   }
 
   @Override
   protected boolean isEmpty() {
-    return elements.isEmpty();
+    return keyedElements.isEmpty();
   }
 
   @Override
   protected E pollFirst() {
-    return elements.remove(elements.firstKey());
+    E element = sortedElements.first();
+    sortedElements.remove(element);
+    keyedElements.remove(element.getId());
+    return element;
   }
 
   @Override
   protected void pushToQueue(E element) {
-    elements.put(element, element);
+    keyedElements.put(element.getId(), element);
+    sortedElements.add(element);
   }
 
   @Override
   protected E remove(E element) {
-    return elements.remove(element);
+    E e = keyedElements.remove(element.getId());
+    if (e != null) {
+      sortedElements.remove(e);
+    }
+    return e;
   }
 
   @Override
   protected boolean contains(E element) {
-    return elements.containsKey(element);
+    return keyedElements.containsKey(element.getId());
   }
 
   @Override
   protected E get(E element) {
-    return elements.get(element);
+    return keyedElements.get(element.getId());
   }
 
   @Override
   protected void clearAllElements() {
-    elements.clear();
+    sortedElements.clear();
+    keyedElements.clear();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
index 50cfd1c29d..90d2c45004 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
@@ -19,8 +19,10 @@
 package org.apache.iotdb.db.mpp.schedule.queue;
 
 import java.util.Comparator;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 /**
  * An efficient subclass of {@link IndexedBlockingQueue} with 2-level priority 
groups. The
@@ -38,10 +40,10 @@ import java.util.TreeMap;
  */
 public class L2PriorityQueue<E extends IDIndexedAccessible> extends 
IndexedBlockingQueue<E> {
 
-  // Here we use a map not a set to act as a queue because we need to get the 
element reference
-  // after it was removed.
-  private SortedMap<E, E> workingElements;
-  private SortedMap<E, E> idleElements;
+  private SortedSet<E> workingSortedElements;
+  private SortedSet<E> idleSortedElements;
+  private Map<ID, E> workingKeyedElements;
+  private Map<ID, E> idleKeyedElements;
 
   /**
    * Init the queue with max capacity and specified comparator.
@@ -54,56 +56,74 @@ public class L2PriorityQueue<E extends IDIndexedAccessible> 
extends IndexedBlock
    */
   public L2PriorityQueue(int maxCapacity, Comparator<E> comparator, E 
queryHolder) {
     super(maxCapacity, queryHolder);
-    this.workingElements = new TreeMap<>(comparator);
-    this.idleElements = new TreeMap<>(comparator);
+    this.workingSortedElements = new TreeSet<>(comparator);
+    this.idleSortedElements = new TreeSet<>(comparator);
+    this.workingKeyedElements = new HashMap<>();
+    this.idleKeyedElements = new HashMap<>();
   }
 
   @Override
   protected boolean isEmpty() {
-    return workingElements.isEmpty() && idleElements.isEmpty();
+    return workingKeyedElements.isEmpty() && idleKeyedElements.isEmpty();
   }
 
   @Override
   protected E pollFirst() {
-    if (workingElements.isEmpty()) {
-      SortedMap<E, E> tmp = workingElements;
-      workingElements = idleElements;
-      idleElements = tmp;
+    if (workingKeyedElements.isEmpty()) {
+      // Switch the two queues
+      Map<ID, E> tmp = workingKeyedElements;
+      workingKeyedElements = idleKeyedElements;
+      idleKeyedElements = tmp;
+      SortedSet<E> tmpSet = workingSortedElements;
+      workingSortedElements = idleSortedElements;
+      idleSortedElements = tmpSet;
     }
-    return workingElements.remove(workingElements.firstKey());
+    E element = workingSortedElements.first();
+    workingSortedElements.remove(element);
+    workingKeyedElements.remove(element.getId());
+    return element;
   }
 
   @Override
   protected void pushToQueue(E element) {
-    idleElements.put(element, element);
+    idleKeyedElements.put(element.getId(), element);
+    idleSortedElements.add(element);
   }
 
   @Override
   protected E remove(E element) {
-    E e = workingElements.remove(element);
-    if (e == null) {
-      e = idleElements.remove(element);
+    E e = workingKeyedElements.remove(element.getId());
+    if (e != null) {
+      workingSortedElements.remove(e);
+      return e;
+    }
+    e = idleKeyedElements.remove(element.getId());
+    if (e != null) {
+      idleSortedElements.remove(e);
     }
     return e;
   }
 
   @Override
   protected boolean contains(E element) {
-    return workingElements.containsKey(element) || 
idleElements.containsKey(element);
+    return workingKeyedElements.containsKey(element.getId())
+        || idleKeyedElements.containsKey(element.getId());
   }
 
   @Override
   protected E get(E element) {
-    E e = workingElements.get(element);
+    E e = workingKeyedElements.get(element.getId());
     if (e != null) {
       return e;
     }
-    return idleElements.get(element);
+    return idleKeyedElements.get(element.getId());
   }
 
   @Override
   protected void clearAllElements() {
-    workingElements.clear();
-    idleElements.clear();
+    workingKeyedElements.clear();
+    workingSortedElements.clear();
+    idleKeyedElements.clear();
+    idleSortedElements.clear();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index e30b1f15bc..abebdaf30d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -53,6 +53,8 @@ public class FragmentInstanceTask implements 
IDIndexedAccessible {
   // Running stats
   private long cpuWallNano;
 
+  private String abortCause;
+
   /** Initialize a dummy instance for queryHolder */
   public FragmentInstanceTask() {
     this(new StubFragmentInstance(), 0L, null);
@@ -139,6 +141,14 @@ public class FragmentInstanceTask implements 
IDIndexedAccessible {
     return o instanceof FragmentInstanceTask && ((FragmentInstanceTask) 
o).getId().equals(id);
   }
 
+  public String getAbortCause() {
+    return abortCause;
+  }
+
+  public void setAbortCause(String abortCause) {
+    this.abortCause = abortCause;
+  }
+
   /** a comparator of ddl, the less the ddl is, the low order it has. */
   public static class TimeoutComparator implements 
Comparator<FragmentInstanceTask> {
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
index 6ea8b7d4d2..a0081b177c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskID.java
@@ -36,10 +36,12 @@ public class FragmentInstanceTaskID implements ID, 
Comparable<FragmentInstanceTa
 
   @Override
   public boolean equals(Object o) {
-    return o instanceof FragmentInstanceTaskID
-        && id.getQueryId().equals(((FragmentInstanceTaskID) o).getQueryId())
-        && id.getFragmentId().getId() == ((FragmentInstanceTaskID) 
o).getFragmentId().getId()
-        && id.getInstanceId().equals(((FragmentInstanceTaskID) 
o).getInstanceId());
+    return o instanceof FragmentInstanceTaskID && ((FragmentInstanceTaskID) 
o).id.equals(id);
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
   }
 
   public String toString() {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
index e73529aa06..82ca5dfd2c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/DefaultTaskSchedulerTest.java
@@ -94,6 +94,7 @@ public class DefaultTaskSchedulerTest {
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
     clear();
   }
 
@@ -141,6 +142,7 @@ public class DefaultTaskSchedulerTest {
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
     clear();
   }
 
@@ -193,6 +195,7 @@ public class DefaultTaskSchedulerTest {
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
     clear();
   }
 
@@ -245,6 +248,7 @@ public class DefaultTaskSchedulerTest {
     Assert.assertNotNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
     Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask));
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
     clear();
   }
 
@@ -296,6 +300,7 @@ public class DefaultTaskSchedulerTest {
     Assert.assertNull(manager.getReadyQueue().get(testTask.getId()));
     Assert.assertNull(manager.getTimeoutQueue().get(testTask.getId()));
     Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
     clear();
   }
 
@@ -342,6 +347,9 @@ public class DefaultTaskSchedulerTest {
       Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
       
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask1));
       
Assert.assertTrue(manager.getQueryMap().get(queryId).contains(testTask2));
+
+      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+      Mockito.verify(mockDriver2, Mockito.never()).failed(Mockito.any());
       clear();
     }
     FragmentInstanceTaskStatus[] validStates =
@@ -351,6 +359,11 @@ public class DefaultTaskSchedulerTest {
           FragmentInstanceTaskStatus.BLOCKED,
         };
     for (FragmentInstanceTaskStatus status : validStates) {
+      Mockito.reset(mockDriver1);
+      Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+      Mockito.reset(mockDriver2);
+      Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+
       FragmentInstanceTask testTask1 = new FragmentInstanceTask(mockDriver1, 
100L, status);
 
       FragmentInstanceTask testTask2 =
@@ -377,6 +390,11 @@ public class DefaultTaskSchedulerTest {
       Assert.assertNull(manager.getTimeoutQueue().get(testTask1.getId()));
       Assert.assertNull(manager.getTimeoutQueue().get(testTask2.getId()));
       Assert.assertFalse(manager.getQueryMap().containsKey(queryId));
+
+      // The mockDriver1.failed() will be called outside the scheduler
+      Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+      Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
+
       clear();
     }
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
index 233908ca44..56435496ab 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java
@@ -117,6 +117,8 @@ public class FragmentInstanceSchedulerTest {
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
 
     // Abort one FragmentInstance
+    Mockito.reset(mockDriver1);
+    Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
     manager.abortFragmentInstance(instanceId1);
     Mockito.verify(mockDataBlockManager, Mockito.times(1))
         .forceDeregisterFragmentInstance(Mockito.any());
@@ -129,9 +131,18 @@ public class FragmentInstanceSchedulerTest {
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus());
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+    Mockito.verify(mockDriver1, Mockito.times(1)).failed(Mockito.any());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_FRAGMENT_ABORT_CALLED, 
task1.getAbortCause());
 
     // Abort the whole query
     Mockito.reset(mockDataBlockManager);
+    Mockito.reset(mockDriver1);
+    Mockito.when(mockDriver1.getInfo()).thenReturn(instanceId1);
+    Mockito.reset(mockDriver2);
+    Mockito.when(mockDriver2.getInfo()).thenReturn(instanceId2);
+    Mockito.reset(mockDriver3);
+    Mockito.when(mockDriver3.getInfo()).thenReturn(instanceId3);
     manager.abortQuery(queryId);
     Mockito.verify(mockDataBlockManager, Mockito.times(2))
         .forceDeregisterFragmentInstance(Mockito.any());
@@ -144,5 +155,14 @@ public class FragmentInstanceSchedulerTest {
     Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task2.getStatus());
     Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task3.getStatus());
     Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());
+    Mockito.verify(mockDriver1, Mockito.never()).failed(Mockito.any());
+    Mockito.verify(mockDriver2, Mockito.times(1)).failed(Mockito.any());
+    Mockito.verify(mockDriver3, Mockito.times(1)).failed(Mockito.any());
+    Mockito.verify(mockDriver4, Mockito.never()).failed(Mockito.any());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED, 
task2.getAbortCause());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_QUERY_CASCADING_ABORTED, 
task3.getAbortCause());
+    Assert.assertNull(task4.getAbortCause());
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
index 87d1de0870..862f4ca207 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinelTest.java
@@ -69,29 +69,33 @@ public class FragmentInstanceTimeoutSentinelTest {
         new FragmentInstanceTask(mockDriver, 100L, 
FragmentInstanceTaskStatus.FINISHED);
     executor.execute(testTask);
     Assert.assertEquals(FragmentInstanceTaskStatus.FINISHED, 
testTask.getStatus());
-    Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // ABORTED status test
     testTask = new FragmentInstanceTask(mockDriver, 100L, 
FragmentInstanceTaskStatus.ABORTED);
     executor.execute(testTask);
     Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, 
testTask.getStatus());
-    Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // RUNNING status test
     testTask = new FragmentInstanceTask(mockDriver, 100L, 
FragmentInstanceTaskStatus.RUNNING);
     executor.execute(testTask);
     Assert.assertEquals(FragmentInstanceTaskStatus.RUNNING, 
testTask.getStatus());
-    Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).failed(Mockito.any());
 
     // BLOCKED status test
     testTask = new FragmentInstanceTask(mockDriver, 100L, 
FragmentInstanceTaskStatus.BLOCKED);
     executor.execute(testTask);
     Assert.assertEquals(FragmentInstanceTaskStatus.BLOCKED, 
testTask.getStatus());
-    Mockito.verify(mockDriver, Mockito.times(0)).processFor(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).blockedToReady(Mockito.any());
+    Mockito.verify(mockDriver, Mockito.never()).processFor(Mockito.any());
+    Assert.assertNull(testTask.getAbortCause());
+    Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).blockedToReady(Mockito.any());
   }
 
   @Test
@@ -127,11 +131,13 @@ public class FragmentInstanceTimeoutSentinelTest {
         new FragmentInstanceTask(mockDriver, 100L, 
FragmentInstanceTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
+    Assert.assertEquals(
+        FragmentInstanceAbortedException.BY_ALREADY_BEING_CANCELLED, 
testTask.getAbortCause());
     Mockito.verify(mockScheduler, Mockito.times(1)).toAborted(Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).blockedToReady(Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).blockedToReady(Mockito.any());
   }
 
   @Test
@@ -166,11 +172,12 @@ public class FragmentInstanceTimeoutSentinelTest {
         new FragmentInstanceTask(mockDriver, 100L, 
FragmentInstanceTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
+    Assert.assertNull(testTask.getAbortCause());
+    Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
     Mockito.verify(mockScheduler, 
Mockito.times(1)).runningToFinished(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).blockedToReady(Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).blockedToReady(Mockito.any());
   }
 
   @Test
@@ -216,10 +223,11 @@ public class FragmentInstanceTimeoutSentinelTest {
         new FragmentInstanceTask(mockDriver, 100L, 
FragmentInstanceTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToReady(Mockito.any(), Mockito.any());
+    Assert.assertNull(testTask.getAbortCause());
+    Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToReady(Mockito.any(), Mockito.any());
     Mockito.verify(mockScheduler, 
Mockito.times(1)).runningToBlocked(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
     Mockito.verify(mockScheduler, 
Mockito.times(1)).blockedToReady(Mockito.any());
   }
 
@@ -266,10 +274,11 @@ public class FragmentInstanceTimeoutSentinelTest {
         new FragmentInstanceTask(mockDriver, 100L, 
FragmentInstanceTaskStatus.READY);
     executor.execute(testTask);
     Mockito.verify(mockDriver, Mockito.times(1)).processFor(Mockito.any());
-    Mockito.verify(mockScheduler, Mockito.times(0)).toAborted(Mockito.any());
+    Assert.assertNull(testTask.getAbortCause());
+    Mockito.verify(mockScheduler, Mockito.never()).toAborted(Mockito.any());
     Mockito.verify(mockScheduler, 
Mockito.times(1)).runningToReady(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToBlocked(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).runningToFinished(Mockito.any(), Mockito.any());
-    Mockito.verify(mockScheduler, 
Mockito.times(0)).blockedToReady(Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToBlocked(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).runningToFinished(Mockito.any(), Mockito.any());
+    Mockito.verify(mockScheduler, 
Mockito.never()).blockedToReady(Mockito.any());
   }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
index b668b91b36..a03fefda0a 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
@@ -132,6 +132,28 @@ public class L1PriorityQueueTest {
     }
   }
 
+  @Test
+  public void testRemoveElement() {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L1PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 5);
+    queue.push(e1);
+    Assert.assertEquals(1, queue.size());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 10);
+    queue.push(e2);
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(e2, queue.remove(new QueueElement.QueueElementID(2)));
+    Assert.assertEquals(1, queue.size());
+  }
+
   @Test
   public void testClear() {
     IndexedBlockingQueue<QueueElement> queue =
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
index d31a5b4d36..da3a6a1a4c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
@@ -141,6 +141,33 @@ public class L2PriorityQueueTest {
     }
   }
 
+  @Test
+  public void testRemoveElement() {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L2PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              int res = Integer.compare(o1.getValue(), o2.getValue());
+              if (res != 0) {
+                return res;
+              }
+              return String.CASE_INSENSITIVE_ORDER.compare(
+                  o1.getId().toString(), o2.getId().toString());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 5);
+    queue.push(e1);
+    Assert.assertEquals(1, queue.size());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 10);
+    queue.push(e2);
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(e2, queue.remove(new QueueElement.QueueElementID(2)));
+    Assert.assertEquals(1, queue.size());
+  }
+
   @Test
   public void testClear() {
     IndexedBlockingQueue<QueueElement> queue =

Reply via email to