This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 52ce8855836 Add stopwatch for AbstractConsumeAllOperator
52ce8855836 is described below

commit 52ce885583640b8c8028993825e0711ef305dfe4
Author: Liao Lanyu <[email protected]>
AuthorDate: Thu Oct 12 20:51:06 2023 +0800

    Add stopwatch for AbstractConsumeAllOperator
---
 .../process/AbstractConsumeAllOperator.java        | 101 ++++++++++++++++-----
 .../operator/process/MergeSortOperator.java        |  56 ++++++------
 .../process/join/RowBasedTimeJoinOperator.java     |  57 +++++-------
 .../db/queryengine/execution/DataDriverTest.java   |   1 +
 .../operator/AlignedSeriesScanOperatorTest.java    |   2 +
 .../operator/HorizontallyConcatOperatorTest.java   |   3 +
 .../execution/operator/LimitOperatorTest.java      |   2 +
 .../execution/operator/MergeSortOperatorTest.java  |  26 ++++++
 .../execution/operator/OffsetOperatorTest.java     |   4 +
 .../operator/SingleDeviceViewOperatorTest.java     |   2 +
 .../execution/operator/SortOperatorTest.java       |   3 +
 11 files changed, 171 insertions(+), 86 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
index e677ce5eafa..235ee16aef8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 
@@ -37,11 +38,17 @@ public abstract class AbstractConsumeAllOperator extends 
AbstractOperator
   protected final List<Operator> children;
   protected final int inputOperatorsCount;
   /** TsBlock from child operator. Only one cache now. */
-  protected final TsBlock[] inputTsBlocks;
+  protected TsBlock[] inputTsBlocks;
 
   protected final boolean[] canCallNext;
   protected int readyChildIndex;
 
+  /** Index of the child that is currently fetching input */
+  protected int currentChildIndex = 0;
+
+  /** Indicate whether we found an empty child input in one loop */
+  protected boolean hasEmptyChildInput = false;
+
   protected AbstractConsumeAllOperator(OperatorContext operatorContext, 
List<Operator> children) {
     this.operatorContext = operatorContext;
     this.children = children;
@@ -84,35 +91,53 @@ public abstract class AbstractConsumeAllOperator extends 
AbstractOperator
    * @throws Exception errors happened while getting tsblock from children
    */
   protected boolean prepareInput() throws Exception {
-    boolean allReady = true;
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!isEmpty(i) || children.get(i) == null) {
+    // start stopwatch
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+
+    while (System.nanoTime() - start < maxRuntime && currentChildIndex < 
inputOperatorsCount) {
+      if (canSkipCurrentChild(currentChildIndex)) {
+        currentChildIndex++;
         continue;
       }
-      if (canCallNext[i] && children.get(i).hasNextWithTimer()) {
-        inputTsBlocks[i] = getNextTsBlock(i);
-        canCallNext[i] = false;
-        // child operator has next but return an empty TsBlock which means 
that it may not
-        // finish calculation in given time slice.
-        // In such case, TimeJoinOperator can't go on calculating, so we just 
return null.
-        // We can also use the while loop here to continuously call the 
hasNext() and next()
-        // methods of the child operator until its hasNext() returns false or 
the next() gets
-        // the data that is not empty, but this will cause the execution time 
of the while loop
-        // to be uncontrollable and may exceed all allocated time slice
-        if (isEmpty(i)) {
-          allReady = false;
+      if (canCallNext[currentChildIndex]) {
+        if (children.get(currentChildIndex).hasNextWithTimer()) {
+          inputTsBlocks[currentChildIndex] = getNextTsBlock(currentChildIndex);
+          canCallNext[currentChildIndex] = false;
+          // child operator has next but return an empty TsBlock which means 
that it may not
+          // finish calculation in given time slice.
+          // In such case, TimeJoinOperator can't go on calculating, so we 
just return null.
+          // We can also use the while loop here to continuously call the 
hasNext() and next()
+          // methods of the child operator until its hasNext() returns false 
or the next() gets
+          // the data that is not empty, but this will cause the execution 
time of the while loop
+          // to be uncontrollable and may exceed all allocated time slice
+          if (isEmpty(currentChildIndex)) {
+            hasEmptyChildInput = true;
+          } else {
+            processCurrentInputTsBlock(currentChildIndex);
+          }
+        } else {
+          handleFinishedChild(currentChildIndex);
         }
       } else {
-        allReady = false;
-        if (canCallNext[i]) {
-          // canCallNext[i] == true means children.get(i).hasNext == false
-          // we can close the finished children
-          children.get(i).close();
-          children.set(i, null);
-        }
+        hasEmptyChildInput = true;
       }
+      currentChildIndex++;
     }
-    return allReady;
+
+    if (currentChildIndex == inputOperatorsCount) {
+      // start a new loop
+      currentChildIndex = 0;
+      if (!hasEmptyChildInput) {
+        // all children are ready now
+        return true;
+      } else {
+        // In a new loop, previously empty child input could be non-empty now, 
and we can skip the
+        // children that have generated input
+        hasEmptyChildInput = false;
+      }
+    }
+    return false;
   }
 
   /** If the tsBlock is null or has no more data in the tsBlock, return true; 
else return false. */
@@ -120,6 +145,32 @@ public abstract class AbstractConsumeAllOperator extends 
AbstractOperator
     return inputTsBlocks[index] == null || inputTsBlocks[index].isEmpty();
   }
 
+  // region helper function used in prepareInput, the subclass can have its 
own implementation
+
+  /**
+   * @param currentChildIndex the index of the child
+   * @return true if we can skip the currentChild in prepareInput
+   */
+  protected boolean canSkipCurrentChild(int currentChildIndex) {
+    return !isEmpty(currentChildIndex) || children.get(currentChildIndex) == 
null;
+  }
+
+  /** @param currentInputIndex index of the input TsBlock */
+  protected void processCurrentInputTsBlock(int currentInputIndex) {
+    // do nothing here, the subclass have its own implementation
+  }
+
+  /**
+   * @param currentChildIndex the index of the child
+   * @throws Exception Potential Exception thrown by Operator.close()
+   */
+  protected void handleFinishedChild(int currentChildIndex) throws Exception {
+    children.get(currentChildIndex).close();
+    children.set(currentChildIndex, null);
+  }
+
+  // endregion
+
   @Override
   public void close() throws Exception {
     for (Operator child : children) {
@@ -127,6 +178,8 @@ public abstract class AbstractConsumeAllOperator extends 
AbstractOperator
         child.close();
       }
     }
+    // friendly for gc
+    inputTsBlocks = null;
   }
 
   protected TsBlock getNextTsBlock(int childIndex) throws Exception {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
index bffef353d5f..cd3ec0711b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
@@ -213,40 +213,36 @@ public class MergeSortOperator extends 
AbstractConsumeAllOperator {
     return currentRetainedSize - minChildReturnSize;
   }
 
+  // region helper function used in prepareInput
+
   /**
-   * Try to cache one result of each child.
-   *
-   * @return true if results of all children are ready or have no more 
TsBlocks. Return false if
-   *     some children is blocked or return null.
+   * @param currentChildIndex the index of the child
+   * @return true if we can skip the currentChild in prepareInput
    */
   @Override
-  protected boolean prepareInput() throws Exception {
-    boolean allReady = true;
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (needCallNext(i)) {
-        continue;
-      }
-      if (canCallNext[i]) {
-        if (children.get(i).hasNextWithTimer()) {
-          inputTsBlocks[i] = getNextTsBlock(i);
-          canCallNext[i] = false;
-          if (isEmpty(i)) {
-            allReady = false;
-          } else {
-            mergeSortHeap.push(new MergeSortKey(inputTsBlocks[i], 0, i));
-          }
-        } else {
-          noMoreTsBlocks[i] = true;
-          inputTsBlocks[i] = null;
-        }
-      } else {
-        allReady = false;
-      }
-    }
-    return allReady;
+  protected boolean canSkipCurrentChild(int currentChildIndex) {
+    return noMoreTsBlocks[currentChildIndex]
+        || !isEmpty(currentChildIndex)
+        || children.get(currentChildIndex) == null;
   }
 
-  private boolean needCallNext(int i) {
-    return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null;
+  /** @param currentInputIndex index of the input TsBlock */
+  @Override
+  protected void processCurrentInputTsBlock(int currentInputIndex) {
+    mergeSortHeap.push(new MergeSortKey(inputTsBlocks[currentInputIndex], 0, 
currentInputIndex));
   }
+
+  /**
+   * @param currentChildIndex the index of the child
+   * @throws Exception Potential Exception thrown by Operator.close()
+   */
+  @Override
+  protected void handleFinishedChild(int currentChildIndex) throws Exception {
+    noMoreTsBlocks[currentChildIndex] = true;
+    inputTsBlocks[currentChildIndex] = null;
+    children.get(currentChildIndex).close();
+    children.set(currentChildIndex, null);
+  }
+
+  // endregion
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 0f4de48a92a..65de39f5717 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -276,46 +276,39 @@ public class RowBasedTimeJoinOperator extends 
AbstractConsumeAllOperator {
     timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
   }
 
+  // region helper function used in prepareInput
+
   /**
-   * Try to cache one result of each child.
-   *
-   * @return true if results of all children are ready or have no more 
TsBlocks. Return false if
-   *     some children is blocked or return null.
+   * @param currentChildIndex the index of the child
+   * @return true if we can skip the currentChild in prepareInput
    */
   @Override
-  protected boolean prepareInput() throws Exception {
-    boolean allReady = true;
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (needCallNext(i)) {
-        continue;
-      }
-      if (canCallNext[i]) {
-        if (children.get(i).hasNextWithTimer()) {
-          inputTsBlocks[i] = getNextTsBlock(i);
-          canCallNext[i] = false;
-          if (isEmpty(i)) {
-            allReady = false;
-          } else {
-            updateTimeSelector(i);
-          }
-        } else {
-          noMoreTsBlocks[i] = true;
-          inputTsBlocks[i] = null;
-          children.get(i).close();
-          children.set(i, null);
-        }
+  protected boolean canSkipCurrentChild(int currentChildIndex) {
+    return noMoreTsBlocks[currentChildIndex]
+        || !isEmpty(currentChildIndex)
+        || children.get(currentChildIndex) == null;
+  }
 
-      } else {
-        allReady = false;
-      }
-    }
-    return allReady;
+  /** @param currentInputIndex index of the input TsBlock */
+  @Override
+  protected void processCurrentInputTsBlock(int currentInputIndex) {
+    updateTimeSelector(currentInputIndex);
   }
 
-  private boolean needCallNext(int i) {
-    return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null;
+  /**
+   * @param currentChildIndex the index of the child
+   * @throws Exception Potential Exception thrown by Operator.close()
+   */
+  @Override
+  protected void handleFinishedChild(int currentChildIndex) throws Exception {
+    noMoreTsBlocks[currentChildIndex] = true;
+    inputTsBlocks[currentChildIndex] = null;
+    children.get(currentChildIndex).close();
+    children.set(currentChildIndex, null);
   }
 
+  // endregion
+
   @TestOnly
   public List<Operator> getChildren() {
     return children;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
index 31fbae023ca..10945c30158 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
@@ -164,6 +164,7 @@ public class DataDriverTest {
                   new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
                   new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator())),
               new AscTimeComparator());
+      timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
 
       LimitOperator limitOperator =
           new LimitOperator(driverContext.getOperatorContexts().get(3), 250, 
timeJoinOperator);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
index bc3dbe0cf90..4ab3237998f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -398,6 +398,7 @@ public class AlignedSeriesScanOperatorTest {
                   new SingleColumnMerger(new InputLocation(6, 0), new 
AscTimeComparator()),
                   new SingleColumnMerger(new InputLocation(7, 0), new 
AscTimeComparator())),
               new AscTimeComparator());
+      timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
       int count = 0;
       while (timeJoinOperator.isBlocked().isDone() && 
timeJoinOperator.hasNext()) {
         TsBlock tsBlock = timeJoinOperator.next();
@@ -688,6 +689,7 @@ public class AlignedSeriesScanOperatorTest {
                   new SingleColumnMerger(new InputLocation(6, 0), new 
DescTimeComparator()),
                   new SingleColumnMerger(new InputLocation(7, 0), new 
DescTimeComparator())),
               new DescTimeComparator());
+      timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
 
       int count = 499;
       while (timeJoinOperator.isBlocked().isDone() && 
timeJoinOperator.hasNext()) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
index 06f34a5bd43..b71ce780d1f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
@@ -182,6 +182,9 @@ public class HorizontallyConcatOperatorTest {
                   TSDataType.INT64,
                   TSDataType.DOUBLE,
                   TSDataType.INT32));
+      horizontallyConcatOperator
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       int count = 0;
       while (horizontallyConcatOperator.isBlocked().isDone()
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
index 41c1696742b..d079e5c8848 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
@@ -149,9 +149,11 @@ public class LimitOperatorTest {
                   new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
                   new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator())),
               new AscTimeComparator());
+      timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
 
       LimitOperator limitOperator =
           new LimitOperator(driverContext.getOperatorContexts().get(3), 250, 
timeJoinOperator);
+
       int count = 0;
       while (limitOperator.isBlocked().isDone() && limitOperator.hasNext()) {
         TsBlock tsBlock = limitOperator.next();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
index b8a268bf698..e846dcceb96 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
@@ -294,6 +294,9 @@ public class MergeSortOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      timeJoinOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
       SingleDeviceViewOperator singleDeviceViewOperator2 =
           new SingleDeviceViewOperator(
               driverContext.getOperatorContexts().get(7),
@@ -320,6 +323,10 @@ public class MergeSortOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      timeJoinOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
       SingleDeviceViewOperator singleDeviceViewOperator3 =
           new SingleDeviceViewOperator(
               driverContext.getOperatorContexts().get(9),
@@ -730,6 +737,9 @@ public class MergeSortOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      timeJoinOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       RowBasedTimeJoinOperator timeJoinOperator2 =
           new RowBasedTimeJoinOperator(
@@ -749,6 +759,9 @@ public class MergeSortOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      timeJoinOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       RowBasedTimeJoinOperator timeJoinOperator3 =
           new RowBasedTimeJoinOperator(
@@ -768,6 +781,10 @@ public class MergeSortOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      timeJoinOperator3
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
       SingleDeviceViewOperator singleDeviceViewOperator1 =
           new SingleDeviceViewOperator(
               driverContext.getOperatorContexts().get(10),
@@ -1200,6 +1217,9 @@ public class MergeSortOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      timeJoinOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       RowBasedTimeJoinOperator timeJoinOperator2 =
           new RowBasedTimeJoinOperator(
@@ -1219,6 +1239,9 @@ public class MergeSortOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      timeJoinOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       RowBasedTimeJoinOperator timeJoinOperator3 =
           new RowBasedTimeJoinOperator(
@@ -1238,6 +1261,9 @@ public class MergeSortOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      timeJoinOperator3
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       List<String> devices = new ArrayList<>(Arrays.asList(DEVICE0, DEVICE1, 
DEVICE2, DEVICE3));
       if (deviceOrdering == Ordering.DESC) Collections.reverse(devices);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
index 8af0193dc5b..d2451e33729 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
@@ -152,6 +152,7 @@ public class OffsetOperatorTest {
                   new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
                   new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator())),
               new AscTimeComparator());
+      timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
 
       OffsetOperator offsetOperator =
           new OffsetOperator(driverContext.getOperatorContexts().get(3), 100, 
timeJoinOperator);
@@ -257,6 +258,7 @@ public class OffsetOperatorTest {
                   new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
                   new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator())),
               new AscTimeComparator());
+      timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
 
       OffsetOperator offsetOperator =
           new OffsetOperator(driverContext.getOperatorContexts().get(3), 0, 
timeJoinOperator);
@@ -359,6 +361,7 @@ public class OffsetOperatorTest {
                   new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
                   new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator())),
               new AscTimeComparator());
+      timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
 
       OffsetOperator offsetOperator =
           new OffsetOperator(driverContext.getOperatorContexts().get(3), 500, 
timeJoinOperator);
@@ -445,6 +448,7 @@ public class OffsetOperatorTest {
                   new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
                   new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator())),
               new AscTimeComparator());
+      timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
 
       OffsetOperator offsetOperator =
           new OffsetOperator(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
index 9912402e063..3f52b273e05 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
@@ -155,6 +155,8 @@ public class SingleDeviceViewOperatorTest {
                   new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
                   new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator())),
               new AscTimeComparator());
+      timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
+
       SingleDeviceViewOperator singleDeviceViewOperator =
           new SingleDeviceViewOperator(
               driverContext.getOperatorContexts().get(3),
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
index fb6bc7c912b..fa1e3b5364e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
@@ -180,6 +180,9 @@ public class SortOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      timeJoinOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       if (!getSortOperator) return timeJoinOperator1;
 

Reply via email to