This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b8852699a49 Fix operator tests for transient null TsBlocks (#17838)
(#17847)
b8852699a49 is described below
commit b8852699a495458169e7078a5554d92275e06545
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 5 11:44:40 2026 +0800
Fix operator tests for transient null TsBlocks (#17838) (#17847)
---
.../AlignedSeriesAggregationScanOperatorTest.java | 33 +++++++-------
.../execution/operator/FillOperatorTest.java | 7 +--
.../operator/HorizontallyConcatOperatorTest.java | 3 +-
.../execution/operator/LinearFillOperatorTest.java | 5 +-
.../execution/operator/MergeSortOperatorTest.java | 3 +-
.../execution/operator/OffsetOperatorTest.java | 19 ++++++--
.../execution/operator/OperatorTestUtils.java | 53 ++++++++++++++++++++++
.../SeriesAggregationScanOperatorTest.java | 33 +++++++-------
.../execution/operator/SeriesScanOperatorTest.java | 3 +-
.../operator/UpdateLastCacheOperatorTest.java | 7 +--
10 files changed, 118 insertions(+), 48 deletions(-)
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
index 2a5c5c13d0b..bc81047b17a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesAggregationScanOperatorTest.java
@@ -69,6 +69,7 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -122,7 +123,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
for (int i = 0; i < measurementSchemas.size(); i++) {
assertEquals(500, resultTsBlock.getColumn(i).getLong(0));
}
@@ -153,7 +154,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, false,
null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
for (int i = 0; i < measurementSchemas.size(); i++) {
assertEquals(500, resultTsBlock.getColumn(i).getLong(0));
}
@@ -189,7 +190,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
count++;
@@ -227,7 +228,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertTrue(resultTsBlock.getColumn(0).getBoolean(0));
assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
assertEquals(20199, resultTsBlock.getColumn(2).getLong(0));
@@ -269,7 +270,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, false,
null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertTrue(resultTsBlock.getColumn(0).getBoolean(0));
assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
assertEquals(20199, resultTsBlock.getColumn(2).getLong(0));
@@ -304,7 +305,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, timeFilter,
true, null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
for (int i = 0; i < measurementSchemas.size(); i++) {
assertEquals(resultTsBlock.getColumn(i).getLong(0), 380);
}
@@ -337,7 +338,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, timeFilter,
true, null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
for (int i = 0; i < measurementSchemas.size(); i++) {
assertEquals(resultTsBlock.getColumn(i).getLong(0), 380);
}
@@ -370,7 +371,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, timeFilter,
true, null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
for (int i = 0; i < measurementSchemas.size(); i++) {
assertEquals(resultTsBlock.getColumn(i).getLong(0), 300);
}
@@ -410,7 +411,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, timeFilter,
true, null);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertTrue(resultTsBlock.getColumn(0).getBoolean(0));
assertEquals(399, resultTsBlock.getColumn(1).getInt(0));
assertEquals(20199, resultTsBlock.getColumn(2).getLong(0));
@@ -448,7 +449,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -489,7 +490,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
aggregators, timeFilter, true, groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -533,7 +534,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -578,7 +579,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, false,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * (3 - count),
resultTsBlock.getTimeColumn().getLong(pos));
@@ -613,7 +614,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -645,7 +646,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(timeColumn[count],
resultTsBlock.getTimeColumn().getLong(pos));
@@ -688,7 +689,7 @@ public class AlignedSeriesAggregationScanOperatorTest {
initAlignedSeriesAggregationScanOperator(aggregators, null, true,
groupByTimeParameter);
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(timeColumn[count],
resultTsBlock.getTimeColumn().getLong(pos));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
index e2bb5a655ec..ac3c83a8364 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/FillOperatorTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -209,7 +210,7 @@ public class FillOperatorTest {
}
};
while (fillOperator.hasNext()) {
- TsBlock block = fillOperator.next();
+ TsBlock block = nextNonNullOrEmpty(fillOperator);
for (int i = 0; i < block.getPositionCount(); i++) {
long expectedTime = i + 1 + count * 10000L;
assertEquals(expectedTime, block.getTimeByIndex(i));
@@ -386,7 +387,7 @@ public class FillOperatorTest {
}
};
while (fillOperator.hasNext()) {
- TsBlock block = fillOperator.next();
+ TsBlock block = nextNonNullOrEmpty(fillOperator);
for (int i = 0; i < block.getPositionCount(); i++) {
long expectedTime = i + 1 + count * 10000L;
assertEquals(expectedTime, block.getTimeByIndex(i));
@@ -563,7 +564,7 @@ public class FillOperatorTest {
}
};
while (fillOperator.hasNext()) {
- TsBlock block = fillOperator.next();
+ TsBlock block = nextNonNullOrEmpty(fillOperator);
for (int i = 0; i < block.getPositionCount(); i++) {
long expectedTime = i + 1 + count * 10000L;
assertEquals(expectedTime, block.getTimeByIndex(i));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
index bad50a7e1c9..ec258a36c0e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/HorizontallyConcatOperatorTest.java
@@ -66,6 +66,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -197,7 +198,7 @@ public class HorizontallyConcatOperatorTest {
int count = 0;
while (horizontallyConcatOperator.isBlocked().isDone()
&& horizontallyConcatOperator.hasNext()) {
- TsBlock tsBlock = horizontallyConcatOperator.next();
+ TsBlock tsBlock = nextNonNullOrEmpty(horizontallyConcatOperator);
assertEquals(6, tsBlock.getValueColumnCount());
for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
assertEquals(count, tsBlock.getTimeByIndex(i));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
index 08859dc127b..8ea63f132e9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LinearFillOperatorTest.java
@@ -41,8 +41,8 @@ import org.junit.Test;
import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class LinearFillOperatorTest {
@@ -1301,8 +1301,7 @@ public class LinearFillOperatorTest {
};
while (fillOperator.hasNext()) {
- TsBlock block = fillOperator.next();
- assertNotNull(block);
+ TsBlock block = nextNonNullOrEmpty(fillOperator);
for (int i = 0; i < block.getPositionCount(); i++) {
long expectedTime = i + count;
assertEquals(expectedTime, block.getTimeByIndex(i));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
index 76ec370ad98..9afc9c7c540 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
@@ -88,6 +88,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -1765,7 +1766,7 @@ public class MergeSortOperatorTest {
int index = 0;
while (mergeSortOperator.isBlocked().isDone() &&
mergeSortOperator.hasNext()) {
- TsBlock result = mergeSortOperator.next();
+ TsBlock result = nextNonNullOrEmpty(mergeSortOperator);
for (int i = 0; i < result.getPositionCount(); i++) {
long time = result.getTimeByIndex(i);
assertEquals(time, ans[index++]);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
index a1188becf10..6602c65ff98 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OffsetOperatorTest.java
@@ -62,6 +62,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -161,7 +162,7 @@ public class OffsetOperatorTest {
new LimitOperator(driverContext.getOperatorContexts().get(4), 250,
offsetOperator);
int count = 100;
while (limitOperator.isBlocked().isDone() && limitOperator.hasNext()) {
- TsBlock tsBlock = limitOperator.next();
+ TsBlock tsBlock = nextNonNullOrEmpty(limitOperator);
assertEquals(2, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -265,7 +266,7 @@ public class OffsetOperatorTest {
int count = 0;
while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
- TsBlock tsBlock = offsetOperator.next();
+ TsBlock tsBlock = nextNonNullOrEmpty(offsetOperator);
assertEquals(2, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -367,7 +368,7 @@ public class OffsetOperatorTest {
new OffsetOperator(driverContext.getOperatorContexts().get(3), 500,
timeJoinOperator);
while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
- TsBlock tsBlock = offsetOperator.next();
+ TsBlock tsBlock = nextNonNull(offsetOperator);
assertEquals(2, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -455,7 +456,7 @@ public class OffsetOperatorTest {
driverContext.getOperatorContexts().get(3), 98_784_247_808L,
timeJoinOperator);
while (offsetOperator.isBlocked().isDone() && offsetOperator.hasNext()) {
- TsBlock tsBlock = offsetOperator.next();
+ TsBlock tsBlock = nextNonNull(offsetOperator);
assertEquals(2, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
@@ -468,4 +469,14 @@ public class OffsetOperatorTest {
instanceNotificationExecutor.shutdown();
}
}
+
+ private static TsBlock nextNonNull(Operator operator) throws Exception {
+ while (operator.hasNext()) {
+ TsBlock result = operator.next();
+ if (result != null) {
+ return result;
+ }
+ }
+ throw new AssertionError("Expected a non-null TsBlock from operator");
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java
new file mode 100644
index 00000000000..37b979e87be
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorTestUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+
+public final class OperatorTestUtils {
+
+ private OperatorTestUtils() {
+ // Utility class.
+ }
+
+ public static TsBlock nextNonNullOrEmpty(Operator operator) throws Exception
{
+ while (operator.hasNext()) {
+ TsBlock result = operator.next();
+ if (!isNullOrEmpty(result)) {
+ return result;
+ }
+ }
+ throw new AssertionError("Expected a non-null and non-empty TsBlock from
operator");
+ }
+
+ public static TsBlock lastNonNullOrEmpty(Operator operator) throws Exception
{
+ TsBlock result = null;
+ while (operator.isBlocked().isDone() && operator.hasNext()) {
+ TsBlock nextResult = operator.next();
+ if (!isNullOrEmpty(nextResult)) {
+ result = nextResult;
+ }
+ }
+ return result;
+ }
+
+ private static boolean isNullOrEmpty(TsBlock tsBlock) {
+ return tsBlock == null || tsBlock.getPositionCount() == 0;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
index bbf96e89fb4..afa08223d64 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -66,6 +66,7 @@ import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
public class SeriesAggregationScanOperatorTest {
@@ -111,7 +112,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
count++;
}
@@ -135,7 +136,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
count++;
}
@@ -161,7 +162,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
count++;
@@ -192,7 +193,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
@@ -228,7 +229,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
@@ -259,7 +260,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
count++;
}
@@ -285,7 +286,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
count++;
}
@@ -311,7 +312,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(resultTsBlock.getColumn(0).getLong(0), 300);
count++;
}
@@ -343,7 +344,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
assertEquals(20100, resultTsBlock.getColumn(0).getInt(0));
assertEquals(399, resultTsBlock.getColumn(1).getInt(0));
assertEquals(100, resultTsBlock.getColumn(2).getLong(0));
@@ -375,7 +376,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -408,7 +409,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -450,7 +451,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -495,7 +496,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * (3 - count),
resultTsBlock.getTimeColumn().getLong(pos));
@@ -530,7 +531,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -563,7 +564,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(timeColumn[count],
resultTsBlock.getTimeColumn().getLong(pos));
@@ -606,7 +607,7 @@ public class SeriesAggregationScanOperatorTest {
int count = 0;
while (seriesAggregationScanOperator.hasNext()) {
- TsBlock resultTsBlock = seriesAggregationScanOperator.next();
+ TsBlock resultTsBlock =
nextNonNullOrEmpty(seriesAggregationScanOperator);
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(timeColumn[count],
resultTsBlock.getTimeColumn().getLong(pos));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
index 11daae6f1c5..39455ab4208 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SeriesScanOperatorTest.java
@@ -55,6 +55,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -114,7 +115,7 @@ public class SeriesScanOperatorTest {
int count = 0;
while (seriesScanOperator.hasNext()) {
- TsBlock tsBlock = seriesScanOperator.next();
+ TsBlock tsBlock = nextNonNullOrEmpty(seriesScanOperator);
assertEquals(1, tsBlock.getValueColumnCount());
assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
index 0c9c47815ae..30604831e26 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/UpdateLastCacheOperatorTest.java
@@ -62,6 +62,7 @@ import java.util.concurrent.ExecutorService;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationOperatorTest.TEST_TIME_SLICE;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static
org.apache.iotdb.db.queryengine.execution.operator.OperatorTestUtils.nextNonNullOrEmpty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -103,7 +104,7 @@ public class UpdateLastCacheOperatorTest {
assertTrue(updateLastCacheOperator.isBlocked().isDone());
assertTrue(updateLastCacheOperator.hasNext());
- TsBlock result = updateLastCacheOperator.next();
+ TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator);
assertEquals(1, result.getPositionCount());
assertEquals(3, result.getValueColumnCount());
@@ -133,7 +134,7 @@ public class UpdateLastCacheOperatorTest {
assertTrue(updateLastCacheOperator.isBlocked().isDone());
assertTrue(updateLastCacheOperator.hasNext());
- TsBlock result = updateLastCacheOperator.next();
+ TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator);
assertEquals(1, result.getPositionCount());
assertEquals(3, result.getValueColumnCount());
@@ -163,7 +164,7 @@ public class UpdateLastCacheOperatorTest {
assertTrue(updateLastCacheOperator.isBlocked().isDone());
assertTrue(updateLastCacheOperator.hasNext());
- TsBlock result = updateLastCacheOperator.next();
+ TsBlock result = nextNonNullOrEmpty(updateLastCacheOperator);
assertEquals(1, result.getPositionCount());
assertEquals(3, result.getValueColumnCount());