This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 52ce8855836 Add stopwatch for AbstractConsumeAllOperator
52ce8855836 is described below
commit 52ce885583640b8c8028993825e0711ef305dfe4
Author: Liao Lanyu <[email protected]>
AuthorDate: Thu Oct 12 20:51:06 2023 +0800
Add stopwatch for AbstractConsumeAllOperator
---
.../process/AbstractConsumeAllOperator.java | 101 ++++++++++++++++-----
.../operator/process/MergeSortOperator.java | 56 ++++++------
.../process/join/RowBasedTimeJoinOperator.java | 57 +++++-------
.../db/queryengine/execution/DataDriverTest.java | 1 +
.../operator/AlignedSeriesScanOperatorTest.java | 2 +
.../operator/HorizontallyConcatOperatorTest.java | 3 +
.../execution/operator/LimitOperatorTest.java | 2 +
.../execution/operator/MergeSortOperatorTest.java | 26 ++++++
.../execution/operator/OffsetOperatorTest.java | 4 +
.../operator/SingleDeviceViewOperatorTest.java | 2 +
.../execution/operator/SortOperatorTest.java | 3 +
11 files changed, 171 insertions(+), 86 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
index e677ce5eafa..235ee16aef8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
@@ -37,11 +38,17 @@ public abstract class AbstractConsumeAllOperator extends
AbstractOperator
protected final List<Operator> children;
protected final int inputOperatorsCount;
/** TsBlock from child operator. Only one cache now. */
- protected final TsBlock[] inputTsBlocks;
+ protected TsBlock[] inputTsBlocks;
protected final boolean[] canCallNext;
protected int readyChildIndex;
+ /** Index of the child that is currently fetching input */
+ protected int currentChildIndex = 0;
+
+ /** Indicate whether we found an empty child input in one loop */
+ protected boolean hasEmptyChildInput = false;
+
protected AbstractConsumeAllOperator(OperatorContext operatorContext,
List<Operator> children) {
this.operatorContext = operatorContext;
this.children = children;
@@ -84,35 +91,53 @@ public abstract class AbstractConsumeAllOperator extends
AbstractOperator
* @throws Exception errors happened while getting tsblock from children
*/
protected boolean prepareInput() throws Exception {
- boolean allReady = true;
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (!isEmpty(i) || children.get(i) == null) {
+ // start stopwatch
+ long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+ long start = System.nanoTime();
+
+ while (System.nanoTime() - start < maxRuntime && currentChildIndex <
inputOperatorsCount) {
+ if (canSkipCurrentChild(currentChildIndex)) {
+ currentChildIndex++;
continue;
}
- if (canCallNext[i] && children.get(i).hasNextWithTimer()) {
- inputTsBlocks[i] = getNextTsBlock(i);
- canCallNext[i] = false;
- // child operator has next but return an empty TsBlock which means
that it may not
- // finish calculation in given time slice.
- // In such case, TimeJoinOperator can't go on calculating, so we just
return null.
- // We can also use the while loop here to continuously call the
hasNext() and next()
- // methods of the child operator until its hasNext() returns false or
the next() gets
- // the data that is not empty, but this will cause the execution time
of the while loop
- // to be uncontrollable and may exceed all allocated time slice
- if (isEmpty(i)) {
- allReady = false;
+ if (canCallNext[currentChildIndex]) {
+ if (children.get(currentChildIndex).hasNextWithTimer()) {
+ inputTsBlocks[currentChildIndex] = getNextTsBlock(currentChildIndex);
+ canCallNext[currentChildIndex] = false;
+ // child operator has next but return an empty TsBlock which means
that it may not
+ // finish calculation in given time slice.
+ // In such case, TimeJoinOperator can't go on calculating, so we
just return null.
+ // We can also use the while loop here to continuously call the
hasNext() and next()
+ // methods of the child operator until its hasNext() returns false
or the next() gets
+ // the data that is not empty, but this will cause the execution
time of the while loop
+ // to be uncontrollable and may exceed all allocated time slice
+ if (isEmpty(currentChildIndex)) {
+ hasEmptyChildInput = true;
+ } else {
+ processCurrentInputTsBlock(currentChildIndex);
+ }
+ } else {
+ handleFinishedChild(currentChildIndex);
}
} else {
- allReady = false;
- if (canCallNext[i]) {
- // canCallNext[i] == true means children.get(i).hasNext == false
- // we can close the finished children
- children.get(i).close();
- children.set(i, null);
- }
+ hasEmptyChildInput = true;
}
+ currentChildIndex++;
}
- return allReady;
+
+ if (currentChildIndex == inputOperatorsCount) {
+ // start a new loop
+ currentChildIndex = 0;
+ if (!hasEmptyChildInput) {
+ // all children are ready now
+ return true;
+ } else {
+ // In a new loop, previously empty child input could be non-empty now,
and we can skip the
+ // children that have generated input
+ hasEmptyChildInput = false;
+ }
+ }
+ return false;
}
/** If the tsBlock is null or has no more data in the tsBlock, return true;
else return false. */
@@ -120,6 +145,32 @@ public abstract class AbstractConsumeAllOperator extends
AbstractOperator
return inputTsBlocks[index] == null || inputTsBlocks[index].isEmpty();
}
+ // region helper function used in prepareInput, the subclass can have its
own implementation
+
+ /**
+ * @param currentChildIndex the index of the child
+ * @return true if we can skip the currentChild in prepareInput
+ */
+ protected boolean canSkipCurrentChild(int currentChildIndex) {
+ return !isEmpty(currentChildIndex) || children.get(currentChildIndex) ==
null;
+ }
+
+ /** @param currentInputIndex index of the input TsBlock */
+ protected void processCurrentInputTsBlock(int currentInputIndex) {
+ // do nothing here, the subclass have its own implementation
+ }
+
+ /**
+ * @param currentChildIndex the index of the child
+ * @throws Exception Potential Exception thrown by Operator.close()
+ */
+ protected void handleFinishedChild(int currentChildIndex) throws Exception {
+ children.get(currentChildIndex).close();
+ children.set(currentChildIndex, null);
+ }
+
+ // endregion
+
@Override
public void close() throws Exception {
for (Operator child : children) {
@@ -127,6 +178,8 @@ public abstract class AbstractConsumeAllOperator extends
AbstractOperator
child.close();
}
}
+ // friendly for gc
+ inputTsBlocks = null;
}
protected TsBlock getNextTsBlock(int childIndex) throws Exception {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
index bffef353d5f..cd3ec0711b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
@@ -213,40 +213,36 @@ public class MergeSortOperator extends
AbstractConsumeAllOperator {
return currentRetainedSize - minChildReturnSize;
}
+ // region helper function used in prepareInput
+
/**
- * Try to cache one result of each child.
- *
- * @return true if results of all children are ready or have no more
TsBlocks. Return false if
- * some children is blocked or return null.
+ * @param currentChildIndex the index of the child
+ * @return true if we can skip the currentChild in prepareInput
*/
@Override
- protected boolean prepareInput() throws Exception {
- boolean allReady = true;
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (needCallNext(i)) {
- continue;
- }
- if (canCallNext[i]) {
- if (children.get(i).hasNextWithTimer()) {
- inputTsBlocks[i] = getNextTsBlock(i);
- canCallNext[i] = false;
- if (isEmpty(i)) {
- allReady = false;
- } else {
- mergeSortHeap.push(new MergeSortKey(inputTsBlocks[i], 0, i));
- }
- } else {
- noMoreTsBlocks[i] = true;
- inputTsBlocks[i] = null;
- }
- } else {
- allReady = false;
- }
- }
- return allReady;
+ protected boolean canSkipCurrentChild(int currentChildIndex) {
+ return noMoreTsBlocks[currentChildIndex]
+ || !isEmpty(currentChildIndex)
+ || children.get(currentChildIndex) == null;
}
- private boolean needCallNext(int i) {
- return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null;
+ /** @param currentInputIndex index of the input TsBlock */
+ @Override
+ protected void processCurrentInputTsBlock(int currentInputIndex) {
+ mergeSortHeap.push(new MergeSortKey(inputTsBlocks[currentInputIndex], 0,
currentInputIndex));
}
+
+ /**
+ * @param currentChildIndex the index of the child
+ * @throws Exception Potential Exception thrown by Operator.close()
+ */
+ @Override
+ protected void handleFinishedChild(int currentChildIndex) throws Exception {
+ noMoreTsBlocks[currentChildIndex] = true;
+ inputTsBlocks[currentChildIndex] = null;
+ children.get(currentChildIndex).close();
+ children.set(currentChildIndex, null);
+ }
+
+ // endregion
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 0f4de48a92a..65de39f5717 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -276,46 +276,39 @@ public class RowBasedTimeJoinOperator extends
AbstractConsumeAllOperator {
timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
}
+ // region helper function used in prepareInput
+
/**
- * Try to cache one result of each child.
- *
- * @return true if results of all children are ready or have no more
TsBlocks. Return false if
- * some children is blocked or return null.
+ * @param currentChildIndex the index of the child
+ * @return true if we can skip the currentChild in prepareInput
*/
@Override
- protected boolean prepareInput() throws Exception {
- boolean allReady = true;
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (needCallNext(i)) {
- continue;
- }
- if (canCallNext[i]) {
- if (children.get(i).hasNextWithTimer()) {
- inputTsBlocks[i] = getNextTsBlock(i);
- canCallNext[i] = false;
- if (isEmpty(i)) {
- allReady = false;
- } else {
- updateTimeSelector(i);
- }
- } else {
- noMoreTsBlocks[i] = true;
- inputTsBlocks[i] = null;
- children.get(i).close();
- children.set(i, null);
- }
+ protected boolean canSkipCurrentChild(int currentChildIndex) {
+ return noMoreTsBlocks[currentChildIndex]
+ || !isEmpty(currentChildIndex)
+ || children.get(currentChildIndex) == null;
+ }
- } else {
- allReady = false;
- }
- }
- return allReady;
+ /** @param currentInputIndex index of the input TsBlock */
+ @Override
+ protected void processCurrentInputTsBlock(int currentInputIndex) {
+ updateTimeSelector(currentInputIndex);
}
- private boolean needCallNext(int i) {
- return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null;
+ /**
+ * @param currentChildIndex the index of the child
+ * @throws Exception Potential Exception thrown by Operator.close()
+ */
+ @Override
+ protected void handleFinishedChild(int currentChildIndex) throws Exception {
+ noMoreTsBlocks[currentChildIndex] = true;
+ inputTsBlocks[currentChildIndex] = null;
+ children.get(currentChildIndex).close();
+ children.set(currentChildIndex, null);
}
+ // endregion
+
@TestOnly
public List<Operator> getChildren() {
return children;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
index 31fbae023ca..10945c30158 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
@@ -164,6 +164,7 @@ public class DataDriverTest {
new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
new AscTimeComparator());
+ timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
LimitOperator limitOperator =
new LimitOperator(driverContext.getOperatorContexts().get(3), 250,
timeJoinOperator);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
index bc3dbe0cf90..4ab3237998f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -398,6 +398,7 @@ public class AlignedSeriesScanOperatorTest {
new SingleColumnMerger(new InputLocation(6, 0), new
AscTimeComparator()),
new SingleColumnMerger(new InputLocation(7, 0), new
AscTimeComparator())),
new AscTimeComparator());
+ timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
int count = 0;
while (timeJoinOperator.isBlocked().isDone() &&
timeJoinOperator.hasNext()) {
TsBlock tsBlock = timeJoinOperator.next();
@@ -688,6 +689,7 @@ public class AlignedSeriesScanOperatorTest {
new SingleColumnMerger(new InputLocation(6, 0), new
DescTimeComparator()),
new SingleColumnMerger(new InputLocation(7, 0), new
DescTimeComparator())),
new DescTimeComparator());
+ timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
int count = 499;
while (timeJoinOperator.isBlocked().isDone() &&
timeJoinOperator.hasNext()) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
index 06f34a5bd43..b71ce780d1f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
@@ -182,6 +182,9 @@ public class HorizontallyConcatOperatorTest {
TSDataType.INT64,
TSDataType.DOUBLE,
TSDataType.INT32));
+ horizontallyConcatOperator
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
int count = 0;
while (horizontallyConcatOperator.isBlocked().isDone()
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
index 41c1696742b..d079e5c8848 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LimitOperatorTest.java
@@ -149,9 +149,11 @@ public class LimitOperatorTest {
new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
new AscTimeComparator());
+ timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
LimitOperator limitOperator =
new LimitOperator(driverContext.getOperatorContexts().get(3), 250,
timeJoinOperator);
+
int count = 0;
while (limitOperator.isBlocked().isDone() && limitOperator.hasNext()) {
TsBlock tsBlock = limitOperator.next();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
index b8a268bf698..e846dcceb96 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
@@ -294,6 +294,9 @@ public class MergeSortOperatorTest {
? new AscTimeComparator()
: new DescTimeComparator())),
timeOrdering == Ordering.ASC ? new AscTimeComparator() : new
DescTimeComparator());
+ timeJoinOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
SingleDeviceViewOperator singleDeviceViewOperator2 =
new SingleDeviceViewOperator(
driverContext.getOperatorContexts().get(7),
@@ -320,6 +323,10 @@ public class MergeSortOperatorTest {
? new AscTimeComparator()
: new DescTimeComparator())),
timeOrdering == Ordering.ASC ? new AscTimeComparator() : new
DescTimeComparator());
+ timeJoinOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
SingleDeviceViewOperator singleDeviceViewOperator3 =
new SingleDeviceViewOperator(
driverContext.getOperatorContexts().get(9),
@@ -730,6 +737,9 @@ public class MergeSortOperatorTest {
? new AscTimeComparator()
: new DescTimeComparator())),
timeOrdering == Ordering.ASC ? new AscTimeComparator() : new
DescTimeComparator());
+ timeJoinOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
RowBasedTimeJoinOperator timeJoinOperator2 =
new RowBasedTimeJoinOperator(
@@ -749,6 +759,9 @@ public class MergeSortOperatorTest {
? new AscTimeComparator()
: new DescTimeComparator())),
timeOrdering == Ordering.ASC ? new AscTimeComparator() : new
DescTimeComparator());
+ timeJoinOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
RowBasedTimeJoinOperator timeJoinOperator3 =
new RowBasedTimeJoinOperator(
@@ -768,6 +781,10 @@ public class MergeSortOperatorTest {
? new AscTimeComparator()
: new DescTimeComparator())),
timeOrdering == Ordering.ASC ? new AscTimeComparator() : new
DescTimeComparator());
+ timeJoinOperator3
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
SingleDeviceViewOperator singleDeviceViewOperator1 =
new SingleDeviceViewOperator(
driverContext.getOperatorContexts().get(10),
@@ -1200,6 +1217,9 @@ public class MergeSortOperatorTest {
? new AscTimeComparator()
: new DescTimeComparator())),
timeOrdering == Ordering.ASC ? new AscTimeComparator() : new
DescTimeComparator());
+ timeJoinOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
RowBasedTimeJoinOperator timeJoinOperator2 =
new RowBasedTimeJoinOperator(
@@ -1219,6 +1239,9 @@ public class MergeSortOperatorTest {
? new AscTimeComparator()
: new DescTimeComparator())),
timeOrdering == Ordering.ASC ? new AscTimeComparator() : new
DescTimeComparator());
+ timeJoinOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
RowBasedTimeJoinOperator timeJoinOperator3 =
new RowBasedTimeJoinOperator(
@@ -1238,6 +1261,9 @@ public class MergeSortOperatorTest {
? new AscTimeComparator()
: new DescTimeComparator())),
timeOrdering == Ordering.ASC ? new AscTimeComparator() : new
DescTimeComparator());
+ timeJoinOperator3
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
List<String> devices = new ArrayList<>(Arrays.asList(DEVICE0, DEVICE1,
DEVICE2, DEVICE3));
if (deviceOrdering == Ordering.DESC) Collections.reverse(devices);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
index 8af0193dc5b..d2451e33729 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
@@ -152,6 +152,7 @@ public class OffsetOperatorTest {
new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
new AscTimeComparator());
+ timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
OffsetOperator offsetOperator =
new OffsetOperator(driverContext.getOperatorContexts().get(3), 100,
timeJoinOperator);
@@ -257,6 +258,7 @@ public class OffsetOperatorTest {
new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
new AscTimeComparator());
+ timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
OffsetOperator offsetOperator =
new OffsetOperator(driverContext.getOperatorContexts().get(3), 0,
timeJoinOperator);
@@ -359,6 +361,7 @@ public class OffsetOperatorTest {
new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
new AscTimeComparator());
+ timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
OffsetOperator offsetOperator =
new OffsetOperator(driverContext.getOperatorContexts().get(3), 500,
timeJoinOperator);
@@ -445,6 +448,7 @@ public class OffsetOperatorTest {
new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
new AscTimeComparator());
+ timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
OffsetOperator offsetOperator =
new OffsetOperator(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
index 9912402e063..3f52b273e05 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SingleDeviceViewOperatorTest.java
@@ -155,6 +155,8 @@ public class SingleDeviceViewOperatorTest {
new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
new AscTimeComparator());
+ timeJoinOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
+
SingleDeviceViewOperator singleDeviceViewOperator =
new SingleDeviceViewOperator(
driverContext.getOperatorContexts().get(3),
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
index fb6bc7c912b..fa1e3b5364e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorTest.java
@@ -180,6 +180,9 @@ public class SortOperatorTest {
? new AscTimeComparator()
: new DescTimeComparator())),
timeOrdering == Ordering.ASC ? new AscTimeComparator() : new
DescTimeComparator());
+ timeJoinOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
if (!getSortOperator) return timeJoinOperator1;