This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 16d3669edb1 Fix operator tests for transient null TsBlocks (#17838)
16d3669edb1 is described below
commit 16d3669edb1814f234ffcc5a5173c95befc54278
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 4 19:11:31 2026 +0800
Fix operator tests for transient null TsBlocks (#17838)
---
.../AlignedSeriesAggregationScanOperatorTest.java | 33 ++++++-------
.../operator/DeviceViewIntoOperatorTest.java | 16 ++-----
.../execution/operator/FillOperatorTest.java | 7 +--
.../operator/HorizontallyConcatOperatorTest.java | 3 +-
.../operator/MergeTreeSortOperatorTest.java | 3 +-
.../execution/operator/OffsetOperatorTest.java | 20 ++++++--
.../execution/operator/OperatorTestUtils.java | 55 ++++++++++++++++++++++
.../SeriesAggregationScanOperatorTest.java | 33 ++++++-------
.../execution/operator/SeriesScanOperatorTest.java | 3 +-
.../execution/operator/TreeIntoOperatorTest.java | 6 +--
.../operator/TreeLinearFillOperatorTest.java | 5 +-
.../operator/UpdateLastCacheOperatorTest.java | 7 +--
12 files changed, 127 insertions(+), 64 deletions(-)
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 9a68324a911..ff4f2a5f638 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -69,6 +69,7 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -122,7 +123,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
for (int i = 0; i < measurementSchemas.size(); i++) {
assertEquals(500, resultTsBlock.getColumn(i).getLong(0));
}
@@ -153,7 +154,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, false,
null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
for (int i = 0; i < measurementSchemas.size(); i++) {
assertEquals(500, resultTsBlock.getColumn(i).getLong(0));
}
@@ -189,7 +190,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
count++;
@@ -227,7 +228,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertTrue(resultTsBlock.getColumn(0).getBoolean(0));
assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
assertEquals(20199, resultTsBlock.getColumn(2).getLong(0));
@@ -269,7 +270,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, false,
null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertTrue(resultTsBlock.getColumn(0).getBoolean(0));
assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
assertEquals(20199, resultTsBlock.getColumn(2).getLong(0));
@@ -304,7 +305,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, timeFilter,
true, null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
for (int i = 0; i < measurementSchemas.size(); i++) {
assertEquals(resultTsBlock.getColumn(i).getLong(0), 380);
}
@@ -337,7 +338,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, timeFilter,
true, null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
for (int i = 0; i < measurementSchemas.size(); i++) {
assertEquals(resultTsBlock.getColumn(i).getLong(0), 380);
}
@@ -370,7 +371,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, timeFilter,
true, null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
for (int i = 0; i < measurementSchemas.size(); i++) {
assertEquals(resultTsBlock.getColumn(i).getLong(0), 300);
}
@@ -410,7 +411,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, timeFilter,
true, null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertTrue(resultTsBlock.getColumn(0).getBoolean(0));
assertEquals(399, resultTsBlock.getColumn(1).getInt(0));
assertEquals(20199, resultTsBlock.getColumn(2).getLong(0));
@@ -448,7 +449,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -489,7 +490,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
aggregators, timeFilter, true, groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -534,7 +535,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -580,7 +581,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, false,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * (3 - count),
resultTsBlock.getTimeColumn().getLong(pos));
@@ -616,7 +617,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -649,7 +650,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(timeColumn[count],
resultTsBlock.getTimeColumn().getLong(pos));
@@ -693,7 +694,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(timeColumn[count],
resultTsBlock.getTimeColumn().getLong(pos));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java
index 7252d2341bd..3262bf97f13 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceViewIntoOperatorTest.java
@@ -72,6 +72,7 @@ import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.lastNonNullOrEmpty;
import static org.apache.iotdb.rpc.RpcUtils.SUCCESS_STATUS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -258,10 +259,7 @@ public class DeviceViewIntoOperatorTest {
prepareDeviceData("device0", 2);
operator = createAndInitOperatorForSingleDevices(2);
- TsBlock result = null;
- while (operator.isBlocked().isDone() && operator.hasNext()) {
- result = operator.next();
- }
+ TsBlock result = lastNonNullOrEmpty(operator);
assertNotNull(result);
assertEquals(2, result.getPositionCount());
@@ -276,10 +274,7 @@ public class DeviceViewIntoOperatorTest {
prepareDeviceData("device0", 10);
operator = createAndInitOperatorForSingleDevices(10);
- TsBlock result = null;
- while (operator.isBlocked().isDone() && operator.hasNext()) {
- result = operator.next();
- }
+ TsBlock result = lastNonNullOrEmpty(operator);
assertNotNull(result);
assertEquals(10, result.getPositionCount());
@@ -295,10 +290,7 @@ public class DeviceViewIntoOperatorTest {
prepareDeviceData("device1", 1);
operator = createAndInitOperatorForMultipleDevices(2, 1);
- TsBlock result = null;
- while (operator.isBlocked().isDone() && operator.hasNext()) {
- result = operator.next();
- }
+ TsBlock result = lastNonNullOrEmpty(operator);
assertNotNull(result);
assertEquals(2, result.getPositionCount());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
index 24a13ea0d7f..a0d3d57a582 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
@@ -43,6 +43,7 @@ import org.junit.Test;
import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -210,7 +211,7 @@ public class FillOperatorTest {
}
};
while (fillOperator.hasNext()) {
- TsBlock block = fillOperator.next();
+ TsBlock block = nextNonNullOrEmpty(fillOperator);
for (int i = 0; i < block.getPositionCount(); i++) {
long expectedTime = i + 1 + count * 10000L;
assertEquals(expectedTime, block.getTimeByIndex(i));
@@ -383,7 +384,7 @@ public class FillOperatorTest {
}
};
while (fillOperator.hasNext()) {
- TsBlock block = fillOperator.next();
+ TsBlock block = nextNonNullOrEmpty(fillOperator);
for (int i = 0; i < block.getPositionCount(); i++) {
long expectedTime = i + 1 + count * 10000L;
assertEquals(expectedTime, block.getTimeByIndex(i));
@@ -560,7 +561,7 @@ public class FillOperatorTest {
}
};
while (fillOperator.hasNext()) {
- TsBlock block = fillOperator.next();
+ TsBlock block = nextNonNullOrEmpty(fillOperator);
for (int i = 0; i < block.getPositionCount(); i++) {
long expectedTime = i + 1 + count * 10000L;
assertEquals(expectedTime, block.getTimeByIndex(i));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
index 135bb7ff83f..d479f145317 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
@@ -68,6 +68,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -203,7 +204,7 @@ public class HorizontallyConcatOperatorTest {
int count = 0;
while (horizontallyConcatOperator.isBlocked().isDone()
&& horizontallyConcatOperator.hasNext()) {
- TsBlock tsBlock = horizontallyConcatOperator.next();
+ TsBlock tsBlock = nextNonNullOrEmpty(horizontallyConcatOperator);
assertEquals(6, tsBlock.getValueColumnCount());
for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
assertEquals(count, tsBlock.getTimeByIndex(i));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
index b6cdefd4194..cfbcf0e2630 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
@@ -93,6 +93,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -1794,7 +1795,7 @@ public class MergeTreeSortOperatorTest {
int index = 0;
while (treeMergeSortOperator.isBlocked().isDone() &&
treeMergeSortOperator.hasNext()) {
- TsBlock result = treeMergeSortOperator.next();
+ TsBlock result = nextNonNullOrEmpty(treeMergeSortOperator);
for (int i = 0; i < result.getPositionCount(); i++) {
long time = result.getTimeByIndex(i);
assertEquals(time, ans[index++]);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
index 526577d61bb..12f8555d0ec 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.queryengine.execution.operator;
+import org.apache.iotdb.calc.execution.operator.Operator;
import org.apache.iotdb.calc.execution.operator.process.LimitOperator;
import org.apache.iotdb.calc.execution.operator.process.OffsetOperator;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -65,6 +66,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -168,7 +170,7 @@ public class OffsetOperatorTest {
new LimitOperator(driverContext.getOperatorContexts().get(4), 250,
offsetOperator);
int count = 100;
while (limitOperator.isBlocked().isDone() && limitOperator.hasNext()) {
- TsBlock tsBlock = limitOperator.next();
+ TsBlock tsBlock = nextNonNullOrEmpty(limitOperator);
assertEquals(2, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -276,7 +278,7 @@ public class OffsetOperatorTest {
int count = 0;
while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
- TsBlock tsBlock = offsetOperator.next();
+ TsBlock tsBlock = nextNonNullOrEmpty(offsetOperator);
assertEquals(2, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -382,7 +384,7 @@ public class OffsetOperatorTest {
new OffsetOperator(driverContext.getOperatorContexts().get(3), 500,
timeJoinOperator);
while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
- TsBlock tsBlock = offsetOperator.next();
+ TsBlock tsBlock = nextNonNull(offsetOperator);
assertEquals(2, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -474,7 +476,7 @@ public class OffsetOperatorTest {
driverContext.getOperatorContexts().get(3), 98_784_247_808L,
timeJoinOperator);
while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
- TsBlock tsBlock = offsetOperator.next();
+ TsBlock tsBlock = nextNonNull(offsetOperator);
assertEquals(2, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -487,4 +489,14 @@ public class OffsetOperatorTest {
instanceNotificationExecutor.shutdown();
}
}
+
+ private static TsBlock nextNonNull(Operator operator) throws Exception {
+ while (operator.hasNext()) {
+ TsBlock result = operator.next();
+ if (result != null) {
+ return result;
+ }
+ }
+ throw new AssertionError("Expected a non-null TsBlock from operator");
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java
new file mode 100644
index 00000000000..77d1eddb4c0
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.calc.execution.operator.Operator;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+
+public final class OperatorTestUtils {
+
+ private OperatorTestUtils() {
+ // Utility class.
+ }
+
+ public static TsBlock nextNonNullOrEmpty(Operator operator) throws Exception
{
+ while (operator.hasNext()) {
+ TsBlock result = operator.next();
+ if (!isNullOrEmpty(result)) {
+ return result;
+ }
+ }
+ throw new AssertionError("Expected a non-null and non-empty TsBlock from
operator");
+ }
+
+ public static TsBlock lastNonNullOrEmpty(Operator operator) throws Exception
{
+ TsBlock result = null;
+ while (operator.isBlocked().isDone() && operator.hasNext()) {
+ TsBlock nextResult = operator.next();
+ if (!isNullOrEmpty(nextResult)) {
+ result = nextResult;
+ }
+ }
+ return result;
+ }
+
+ private static boolean isNullOrEmpty(TsBlock tsBlock) {
+ return tsBlock == null || tsBlock.getPositionCount() == 0;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
index edf22b93a4a..43efba320e9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -68,6 +68,7 @@ import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
public class SeriesAggregationScanOperatorTest {
@@ -113,7 +114,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
count++;
}
@@ -137,7 +138,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
count++;
}
@@ -163,7 +164,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
count++;
@@ -194,7 +195,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
@@ -230,7 +231,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
@@ -261,7 +262,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
count++;
}
@@ -287,7 +288,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
count++;
}
@@ -313,7 +314,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(resultTsBlock.getColumn(0).getLong(0), 300);
count++;
}
@@ -345,7 +346,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(20100, resultTsBlock.getColumn(0).getInt(0));
assertEquals(399, resultTsBlock.getColumn(1).getInt(0));
assertEquals(100, resultTsBlock.getColumn(2).getLong(0));
@@ -377,7 +378,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -410,7 +411,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -452,7 +453,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -497,7 +498,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * (3 - count),
resultTsBlock.getTimeColumn().getLong(pos));
@@ -532,7 +533,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -565,7 +566,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(timeColumn[count],
resultTsBlock.getTimeColumn().getLong(pos));
@@ -608,7 +609,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(timeColumn[count],
resultTsBlock.getTimeColumn().getLong(pos));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
index 92cb35f515a..23dda52f2bd 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
@@ -58,6 +58,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -119,7 +120,7 @@ public class SeriesScanOperatorTest {
int count = 0;
while (seriesScanOperator.hasNext()) {
- TsBlock tsBlock = seriesScanOperator.next();
+ TsBlock tsBlock = nextNonNullOrEmpty(seriesScanOperator);
assertEquals(1, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java
index 690da2ffd19..81cc4c6654a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeIntoOperatorTest.java
@@ -69,6 +69,7 @@ import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.lastNonNullOrEmpty;
import static org.apache.iotdb.rpc.RpcUtils.SUCCESS_STATUS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -198,10 +199,7 @@ public class TreeIntoOperatorTest {
prepareSourceTargetPairs(2);
operator = createAndInitOperator(2);
- TsBlock result = null;
- while (operator.isBlocked().isDone() && operator.hasNext()) {
- result = operator.next();
- }
+ TsBlock result = lastNonNullOrEmpty(operator);
assertNotNull(result);
assertEquals(2, result.getPositionCount());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeLinearFillOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeLinearFillOperatorTest.java
index 393a23dc2a7..7d1962fb3f4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeLinearFillOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeLinearFillOperatorTest.java
@@ -42,8 +42,8 @@ import org.junit.Test;
import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class TreeLinearFillOperatorTest {
@@ -1309,8 +1309,7 @@ public class TreeLinearFillOperatorTest {
};
while (fillOperator.hasNext()) {
- TsBlock block = fillOperator.next();
- assertNotNull(block);
+ TsBlock block = nextNonNullOrEmpty(fillOperator);
for (int i = 0; i < block.getPositionCount(); i++) {
long expectedTime = i + count;
assertEquals(expectedTime, block.getTimeByIndex(i));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
index c9688bbc2af..2454ffb4fa8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
@@ -63,6 +63,7 @@ import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -104,7 +105,7 @@ public class UpdateLastCacheOperatorTest {
assertTrue(updateLastCacheOperator.isBlocked().isDone());
assertTrue(updateLastCacheOperator.hasNext());
- TsBlock result = updateLastCacheOperator.next();
+ TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator);
assertEquals(1, result.getPositionCount());
assertEquals(3, result.getValueColumnCount());
@@ -134,7 +135,7 @@ public class UpdateLastCacheOperatorTest {
assertTrue(updateLastCacheOperator.isBlocked().isDone());
assertTrue(updateLastCacheOperator.hasNext());
- TsBlock result = updateLastCacheOperator.next();
+ TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator);
assertEquals(1, result.getPositionCount());
assertEquals(3, result.getValueColumnCount());
@@ -164,7 +165,7 @@ public class UpdateLastCacheOperatorTest {
assertTrue(updateLastCacheOperator.isBlocked().isDone());
assertTrue(updateLastCacheOperator.hasNext());
- TsBlock result = updateLastCacheOperator.next();
+ TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator);
assertEquals(1, result.getPositionCount());
assertEquals(3, result.getValueColumnCount());