This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/fix_dop
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 55daa6f6a95f088831b1db5a00cb5e3d754341d6
Author: Beyyes <[email protected]>
AuthorDate: Tue Nov 28 10:48:22 2023 +0800

    fix npe in TopKOperator
---
 .../IoTDBAlignByDeviceWithTemplateIT.java          |  16 ++
 .../execution/operator/process/TopKOperator.java   |  25 +-
 .../queryengine/plan/analyze/TemplatedAnalyze.java |   1 +
 .../execution/operator/TopKOperatorTest.java       | 297 +++++++++++++++++----
 4 files changed, 280 insertions(+), 59 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
index 21ff962ad6a..78cda6b40e6 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
@@ -577,6 +577,22 @@ public class IoTDBAlignByDeviceWithTemplateIT {
         "select count(s1+1) from root.sg1.** align by device;", 
expectedHeader, retArray);
   }
 
+  @Test
+  public void emptyResultTest() {
+    String[] expectedHeader = new String[] {"Time,Device,s3,s1,s2"};
+    String[] retArray = new String[] {};
+    resultSetEqualTest(
+        "SELECT * FROM root.sg1.** where time>=now()-1d and time<=now() "
+            + "ORDER BY TIME DESC ALIGN BY DEVICE;",
+        expectedHeader,
+        retArray);
+    resultSetEqualTest(
+        "SELECT * FROM root.sg2.** where time>=now()-1d and time<=now() "
+            + "ORDER BY TIME DESC ALIGN BY DEVICE;",
+        expectedHeader,
+        retArray);
+  }
+
   private static void insertData() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
index d14741a31d6..d59067463f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
@@ -80,6 +80,8 @@ public class TopKOperator implements ProcessOperator {
   // the data of every childOperator is in order
   private final boolean childrenDataInOrder;
 
+  public static int operatorBatchUpperBound = 100000;
+
   public TopKOperator(
       OperatorContext operatorContext,
       List<Operator> deviceOperators,
@@ -98,7 +100,10 @@ public class TopKOperator implements ProcessOperator {
 
     initResultTsBlock();
 
-    deviceBatchStep = 10000 % topValue == 0 ? 10000 / topValue : 10000 / 
topValue + 1;
+    deviceBatchStep =
+        operatorBatchUpperBound % topValue == 0
+            ? operatorBatchUpperBound / topValue
+            : operatorBatchUpperBound / topValue + 1;
     canCallNext = new boolean[deviceOperators.size()];
   }
 
@@ -137,7 +142,14 @@ public class TopKOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() throws Exception {
-    return !(deviceIndex >= deviceOperators.size() && resultReturnSize >= 
topKResult.length);
+    if (deviceIndex >= deviceOperators.size()) {
+      if (topKResult == null) {
+        return false;
+      }
+
+      return resultReturnSize < topKResult.length;
+    }
+    return true;
   }
 
   @Override
@@ -289,10 +301,13 @@ public class TopKOperator implements ProcessOperator {
     }
 
     tsBlockBuilder.reset();
+
+    if (topKResult == null || topKResult.length == 0) {
+      return tsBlockBuilder.build();
+    }
+
     ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
-    for (int i = resultReturnSize, size = (topKResult == null ? 0 : 
topKResult.length);
-        i < size;
-        i++) {
+    for (int i = resultReturnSize; i < topKResult.length; i++) {
       MergeSortKey mergeSortKey = topKResult[i];
       TsBlock targetBlock = mergeSortKey.tsBlock;
       tsBlockBuilder
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
index a66b1518c92..5ec7997fdfc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
@@ -171,6 +171,7 @@ public class TemplatedAnalyze {
     // generate result set header according to output expressions
     analyzeOutput(analysis, queryStatement, outputExpressions);
 
+    context.generateGlobalTimeFilter(analysis);
     // fetch partition information
     analyzeDataPartition(analysis, schemaTree, partitionFetcher, 
context.getGlobalTimeFilter());
     return true;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TopKOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TopKOperatorTest.java
index 2a48c94159f..62ba3e68971 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TopKOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TopKOperatorTest.java
@@ -197,9 +197,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath1));
       seriesScanOperator1.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator1
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath2 =
           new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor0", 
TSDataType.INT32);
@@ -211,9 +209,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath2));
       seriesScanOperator2.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator2
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath3 =
           new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor1", 
TSDataType.INT32);
@@ -225,9 +221,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath3));
       seriesScanOperator3.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator3
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath4 =
           new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor0", 
TSDataType.INT32);
@@ -239,9 +233,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath4));
       seriesScanOperator4.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator4
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath5 =
           new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor1", 
TSDataType.INT32);
@@ -253,9 +245,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath5));
       seriesScanOperator5.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator5
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       List<TSDataType> tsDataTypes =
           new LinkedList<>(
@@ -292,9 +282,7 @@ public class TopKOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
-      timeJoinOperator1
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       SingleDeviceViewOperator singleDeviceViewOperator2 =
           new SingleDeviceViewOperator(
@@ -322,9 +310,7 @@ public class TopKOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
-      timeJoinOperator2
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       SingleDeviceViewOperator singleDeviceViewOperator3 =
           new SingleDeviceViewOperator(
@@ -348,7 +334,7 @@ public class TopKOperatorTest {
                   Arrays.asList(TSDataType.INT64, TSDataType.TEXT)),
               limitValue,
               true);
-      topKOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
       return topKOperator;
     } catch (IllegalPathException e) {
       e.printStackTrace();
@@ -913,9 +899,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath1));
       seriesScanOperator1.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator1
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath2 =
           new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor0", 
TSDataType.INT32);
@@ -927,9 +911,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath2));
       seriesScanOperator2.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator2
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath3 =
           new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor1", 
TSDataType.INT32);
@@ -941,9 +923,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath3));
       seriesScanOperator3.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator3
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath4 =
           new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor0", 
TSDataType.INT32);
@@ -955,9 +935,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath4));
       seriesScanOperator4.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator4
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath5 =
           new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor1", 
TSDataType.INT32);
@@ -969,9 +947,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath5));
       seriesScanOperator5.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator5
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath6 =
           new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device3.sensor0", 
TSDataType.INT32);
@@ -983,9 +959,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath6));
       seriesScanOperator6.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator6
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       MeasurementPath measurementPath7 =
           new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device3.sensor1", 
TSDataType.INT32);
@@ -997,9 +971,7 @@ public class TopKOperatorTest {
               timeOrdering,
               SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath7));
       seriesScanOperator7.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator7
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
       List<TSDataType> tsDataTypes =
           new LinkedList<>(Arrays.asList(TSDataType.TEXT, TSDataType.INT32, 
TSDataType.INT32));
@@ -1021,9 +993,7 @@ public class TopKOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
-      timeJoinOperator1
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
       RowBasedTimeJoinOperator timeJoinOperator2 =
           new RowBasedTimeJoinOperator(
               driverContext.getOperatorContexts().get(8),
@@ -1042,9 +1012,7 @@ public class TopKOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
-      timeJoinOperator2
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
       RowBasedTimeJoinOperator timeJoinOperator3 =
           new RowBasedTimeJoinOperator(
               driverContext.getOperatorContexts().get(9),
@@ -1063,9 +1031,7 @@ public class TopKOperatorTest {
                           ? new AscTimeComparator()
                           : new DescTimeComparator())),
               timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
-      timeJoinOperator3
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
       List<String> devices = new ArrayList<>(Arrays.asList(DEVICE0, DEVICE1, 
DEVICE2, DEVICE3));
       if (deviceOrdering == Ordering.DESC) {
         Collections.reverse(devices);
@@ -1111,7 +1077,7 @@ public class TopKOperatorTest {
                   Arrays.asList(TSDataType.TEXT, TSDataType.INT64)),
               limitValue,
               false);
-      topKOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
       return topKOperator;
     } catch (IllegalPathException e) {
       e.printStackTrace();
@@ -1343,7 +1309,7 @@ public class TopKOperatorTest {
                 Collections.singletonList(TSDataType.INT64)),
             20,
             true);
-    topKOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
+    OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
 
     int index = 0;
     while (topKOperator.isBlocked().isDone() && topKOperator.hasNext()) {
@@ -1358,4 +1324,227 @@ public class TopKOperatorTest {
     }
     assertEquals(index, ans.length);
   }
+
+  // 
----------------------------------------------------------------------------------------------
+  //                     order by time - all result of SeriesScan is empty
+  // 
----------------------------------------------------------------------------------------------
+  //                                     TopKOperator
+  //                              ____________|_______________
+  //                              /           |               \
+  //           SingleDeviceViewOperator SingleDeviceViewOperator 
SingleDeviceViewOperator
+  //                     /                     |                              \
+  //        SeriesScanOperator      TimeJoinOperator                
TimeJoinOperator
+  //                                  /                \              /        
       \
+  //                  SeriesScanOperator SeriesScanOperator SeriesScanOperator 
  SeriesScanOperator
+  // 
----------------------------------------------------------------------------------------------
+  public TopKOperator emptyTopKOperatorTest(
+      Ordering timeOrdering, Ordering deviceOrdering, int limitValue) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      // Construct operator tree
+      QueryId queryId = new QueryId("stub_query");
+
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      driverContext.addOperatorContext(1, planNodeId1, 
SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      driverContext.addOperatorContext(2, planNodeId2, 
SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      driverContext.addOperatorContext(3, planNodeId3, 
SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      driverContext.addOperatorContext(4, planNodeId4, 
SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      driverContext.addOperatorContext(5, planNodeId5, 
SeriesScanOperator.class.getSimpleName());
+      driverContext.addOperatorContext(
+          6, new PlanNodeId("6"), 
SingleDeviceViewOperator.class.getSimpleName());
+      driverContext.addOperatorContext(
+          7, new PlanNodeId("7"), 
RowBasedTimeJoinOperator.class.getSimpleName());
+      driverContext.addOperatorContext(
+          8, new PlanNodeId("8"), 
SingleDeviceViewOperator.class.getSimpleName());
+      driverContext.addOperatorContext(
+          9, new PlanNodeId("9"), 
RowBasedTimeJoinOperator.class.getSimpleName());
+      driverContext.addOperatorContext(
+          10, new PlanNodeId("10"), 
SingleDeviceViewOperator.class.getSimpleName());
+      driverContext.addOperatorContext(
+          11, new PlanNodeId("11"), TopKOperator.class.getSimpleName());
+
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(0),
+              planNodeId1,
+              measurementPath1,
+              timeOrdering,
+              SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath1));
+      seriesScanOperator1.initQueryDataSource(
+          new QueryDataSource(Collections.emptyList(), 
Collections.emptyList()));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor0", 
TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(1),
+              planNodeId2,
+              measurementPath2,
+              timeOrdering,
+              SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath2));
+      seriesScanOperator2.initQueryDataSource(
+          new QueryDataSource(Collections.emptyList(), 
Collections.emptyList()));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      MeasurementPath measurementPath3 =
+          new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device1.sensor1", 
TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator3 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(2),
+              planNodeId3,
+              measurementPath3,
+              timeOrdering,
+              SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath3));
+      seriesScanOperator3.initQueryDataSource(
+          new QueryDataSource(Collections.emptyList(), 
Collections.emptyList()));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      MeasurementPath measurementPath4 =
+          new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor0", 
TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator4 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(3),
+              planNodeId4,
+              measurementPath4,
+              timeOrdering,
+              SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath4));
+      seriesScanOperator4.initQueryDataSource(
+          new QueryDataSource(Collections.emptyList(), 
Collections.emptyList()));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      MeasurementPath measurementPath5 =
+          new MeasurementPath(TOP_K_OPERATOR_TEST_SG + ".device2.sensor1", 
TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator5 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(4),
+              planNodeId5,
+              measurementPath5,
+              timeOrdering,
+              SeriesScanOptions.getDefaultSeriesScanOptions(measurementPath5));
+      seriesScanOperator5.initQueryDataSource(
+          new QueryDataSource(Collections.emptyList(), 
Collections.emptyList()));
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      List<TSDataType> tsDataTypes =
+          new LinkedList<>(
+              Arrays.asList(
+                  TSDataType.TEXT,
+                  TSDataType.INT32,
+                  TSDataType.INT32,
+                  TSDataType.INT32,
+                  TSDataType.INT32,
+                  TSDataType.INT32));
+      SingleDeviceViewOperator singleDeviceViewOperator1 =
+          new SingleDeviceViewOperator(
+              driverContext.getOperatorContexts().get(5),
+              DEVICE0,
+              seriesScanOperator1,
+              Collections.singletonList(1),
+              tsDataTypes);
+
+      RowBasedTimeJoinOperator timeJoinOperator1 =
+          new RowBasedTimeJoinOperator(
+              driverContext.getOperatorContexts().get(6),
+              Arrays.asList(seriesScanOperator2, seriesScanOperator3),
+              timeOrdering,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(
+                      new InputLocation(0, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()),
+                  new SingleColumnMerger(
+                      new InputLocation(1, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator())),
+              timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      SingleDeviceViewOperator singleDeviceViewOperator2 =
+          new SingleDeviceViewOperator(
+              driverContext.getOperatorContexts().get(7),
+              DEVICE1,
+              timeJoinOperator1,
+              Arrays.asList(2, 3),
+              tsDataTypes);
+
+      RowBasedTimeJoinOperator timeJoinOperator2 =
+          new RowBasedTimeJoinOperator(
+              driverContext.getOperatorContexts().get(8),
+              Arrays.asList(seriesScanOperator4, seriesScanOperator5),
+              timeOrdering,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(
+                      new InputLocation(0, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()),
+                  new SingleColumnMerger(
+                      new InputLocation(1, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator())),
+              timeOrdering == Ordering.ASC ? new AscTimeComparator() : new 
DescTimeComparator());
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      SingleDeviceViewOperator singleDeviceViewOperator3 =
+          new SingleDeviceViewOperator(
+              driverContext.getOperatorContexts().get(9),
+              DEVICE2,
+              timeJoinOperator2,
+              Arrays.asList(4, 5),
+              tsDataTypes);
+
+      TopKOperator topKOperator =
+          new TopKOperator(
+              driverContext.getOperatorContexts().get(10),
+              Arrays.asList(
+                  singleDeviceViewOperator1, singleDeviceViewOperator2, 
singleDeviceViewOperator3),
+              tsDataTypes,
+              MergeSortComparator.getComparator(
+                  Arrays.asList(
+                      new SortItem(OrderByKey.TIME, timeOrdering),
+                      new SortItem(OrderByKey.DEVICE, deviceOrdering)),
+                  Arrays.asList(-1, 0),
+                  Arrays.asList(TSDataType.INT64, TSDataType.TEXT)),
+              limitValue,
+              true);
+      OperatorContext.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      return topKOperator;
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+      return null;
+    }
+  }
+
+  @Test
+  public void testEmptyTopKOperator() throws Exception {
+    TopKOperator topKOperator = emptyTopKOperatorTest(Ordering.ASC, 
Ordering.ASC, limitValue);
+    while (topKOperator.isBlocked().isDone() && topKOperator.hasNext()) {
+      TsBlock tsBlock = topKOperator.next();
+      if (tsBlock == null || tsBlock.isEmpty()) {
+        continue;
+      }
+      fail();
+    }
+  }
 }

Reply via email to