This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fda55b56e2 [IOTDB-3082] Implememtation of OffsetOperator (#5779)
fda55b56e2 is described below
commit fda55b56e250109f1af561303fc98033b3ca0fda
Author: Jackie Tien <[email protected]>
AuthorDate: Tue May 3 20:03:09 2022 +0800
[IOTDB-3082] Implememtation of OffsetOperator (#5779)
---
.../execution/operator/process/OffsetOperator.java | 34 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 10 +-
.../mpp/execution/operator/OffsetOperatorTest.java | 391 +++++++++++++++++++++
3 files changed, 428 insertions(+), 7 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 280d372500..04a7915e52 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -18,40 +18,62 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
public class OffsetOperator implements ProcessOperator {
+ private final OperatorContext operatorContext;
+ private long remainingOffset;
+ private final Operator child;
+
+ public OffsetOperator(OperatorContext operatorContext, long offset, Operator
child) {
+ this.operatorContext = requireNonNull(operatorContext, "operatorContext is
null");
+ checkArgument(offset >= 0, "offset must be at least zero");
+ this.remainingOffset = offset;
+ this.child = requireNonNull(child, "child operator is null");
+ }
+
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
public ListenableFuture<Void> isBlocked() {
- return ProcessOperator.super.isBlocked();
+ return child.isBlocked();
}
@Override
public TsBlock next() {
- return null;
+ TsBlock block = child.next();
+ if (remainingOffset > 0) {
+ int offset = Math.min((int) remainingOffset, block.getPositionCount());
+ remainingOffset -= offset;
+ return block.getRegion(offset, block.getPositionCount() - offset);
+ } else {
+ return block;
+ }
}
@Override
public boolean hasNext() {
- return false;
+ return child.hasNext();
}
@Override
public void close() throws Exception {
- ProcessOperator.super.close();
+ child.close();
}
@Override
public boolean isFinished() {
- return false;
+ return child.isFinished();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 4f42b6871c..9f127c883c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
@@ -371,7 +372,14 @@ public class LocalExecutionPlanner {
@Override
public Operator visitOffset(OffsetNode node, LocalExecutionPlanContext
context) {
- return super.visitOffset(node, context);
+ Operator child = node.getChild().accept(this, context);
+ return new OffsetOperator(
+ context.instanceContext.addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ OffsetOperator.class.getSimpleName()),
+ node.getOffset(),
+ child);
}
@Override
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
new file mode 100644
index 0000000000..7fb1ae715a
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class OffsetOperatorTest {
+
+ private static final String TIME_JOIN_OPERATOR_TEST_SG =
"root.LimitOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ @Before
+ public void setUp() throws MetadataException, IOException,
WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources,
TIME_JOIN_OPERATOR_TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ @Test
+ public void batchTest1() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ allSensors.add("sensor1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 4, new PlanNodeId("4"), OffsetOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 5, new PlanNodeId("5"), LimitOperator.class.getSimpleName());
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ planNodeId1,
+ measurementPath1,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+ seriesScanOperator1.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1",
TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ planNodeId2,
+ measurementPath2,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ true);
+ seriesScanOperator2.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
+
+ TimeJoinOperator timeJoinOperator =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+ OrderBy.TIMESTAMP_ASC,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
+ new AscTimeComparator());
+
+ OffsetOperator offsetOperator =
+ new OffsetOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3), 100,
timeJoinOperator);
+
+ LimitOperator limitOperator =
+ new LimitOperator(
+ fragmentInstanceContext.getOperatorContexts().get(4), 250,
offsetOperator);
+ int count = 0;
+ while (limitOperator.hasNext()) {
+ TsBlock tsBlock = limitOperator.next();
+ assertEquals(2, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+ if (count < 5) {
+ assertEquals(0, tsBlock.getPositionCount());
+ } else if (count < 17) {
+ assertEquals(20, tsBlock.getPositionCount());
+ } else {
+ assertEquals(10, tsBlock.getPositionCount());
+ }
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long expectedTime = i + 20L * count;
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+ if (expectedTime < 200) {
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ } else {
+ assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
+ assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+ }
+ }
+ count++;
+ }
+ assertEquals(18, count);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ /** offset is 0 in which case we will get all data */
+ @Test
+ public void batchTest2() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ allSensors.add("sensor1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 4, new PlanNodeId("4"), OffsetOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 5, new PlanNodeId("5"), LimitOperator.class.getSimpleName());
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ planNodeId1,
+ measurementPath1,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+ seriesScanOperator1.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1",
TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ planNodeId2,
+ measurementPath2,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ true);
+ seriesScanOperator2.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
+
+ TimeJoinOperator timeJoinOperator =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+ OrderBy.TIMESTAMP_ASC,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
+ new AscTimeComparator());
+
+ OffsetOperator offsetOperator =
+ new OffsetOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3), 0,
timeJoinOperator);
+
+ int count = 0;
+ while (offsetOperator.hasNext()) {
+ TsBlock tsBlock = offsetOperator.next();
+ assertEquals(2, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+ assertEquals(20, tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long expectedTime = i + 20L * count;
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+ if (expectedTime < 200) {
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ } else {
+ assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
+ assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+ }
+ }
+ count++;
+ }
+ assertEquals(25, count);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ /** offset is larger than max row number in which case we will get no data */
+ @Test
+ public void batchTest3() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ allSensors.add("sensor1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 4, new PlanNodeId("4"), OffsetOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 5, new PlanNodeId("5"), LimitOperator.class.getSimpleName());
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ planNodeId1,
+ measurementPath1,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+ seriesScanOperator1.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
+
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1",
TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ planNodeId2,
+ measurementPath2,
+ allSensors,
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ true);
+ seriesScanOperator2.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
+
+ TimeJoinOperator timeJoinOperator =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+ OrderBy.TIMESTAMP_ASC,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
+ new AscTimeComparator());
+
+ OffsetOperator offsetOperator =
+ new OffsetOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3), 500,
timeJoinOperator);
+
+ int count = 0;
+ while (offsetOperator.hasNext()) {
+ TsBlock tsBlock = offsetOperator.next();
+ assertEquals(2, tsBlock.getValueColumnCount());
+ assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+ assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+ assertEquals(0, tsBlock.getPositionCount());
+ count++;
+ }
+ assertEquals(25, count);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+}