This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty-mpp-2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bbca5f2d06af2cb4a765eeaaeae74dcdc48efed2 Author: JackieTien97 <[email protected]> AuthorDate: Mon Mar 28 20:35:22 2022 +0800 add UT for TimeJoinOperator --- .../apache/iotdb/db/utils/EnvironmentUtils.java | 1 - .../db/mpp/operator/SeriesScanOperatorTest.java | 124 +++++++++-------- .../db/mpp/operator/TimeJoinOperatorTest.java | 152 +++++++++++++++++++++ .../reader/series/SeriesAggregateReaderTest.java | 3 +- .../reader/series/SeriesReaderByTimestampTest.java | 3 +- .../db/query/reader/series/SeriesReaderTest.java | 3 +- .../query/reader/series/SeriesReaderTestUtil.java | 13 +- 7 files changed, 229 insertions(+), 70 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index 8566608..a069c1e 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -37,7 +37,6 @@ import org.apache.iotdb.db.exception.TriggerManagementException; import org.apache.iotdb.db.exception.UDFRegistrationException; import org.apache.iotdb.db.metadata.idtable.IDTableManager; import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory; -import org.apache.iotdb.db.mpp.operator.OperatorContext; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.query.control.QueryResourceManager; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java index 5891566..1a4b257 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java @@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.path.MeasurementPath; -import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; @@ -37,6 +36,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.column.IntColumn; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -50,68 +50,74 @@ import java.util.Set; import static org.junit.Assert.*; public class SeriesScanOperatorTest { - private static final String SERIES_READER_TEST_SG = "root.seriesScanOperatorTest"; - private final List<String> deviceIds = new ArrayList<>(); - private final List<MeasurementSchema> measurementSchemas = new ArrayList<>(); + private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.seriesScanOperatorTest"; + private final List<String> deviceIds = new ArrayList<>(); + private final List<MeasurementSchema> measurementSchemas = new ArrayList<>(); - private final List<TsFileResource> seqResources = new ArrayList<>(); - private final List<TsFileResource> unSeqResources = new ArrayList<>(); + private final List<TsFileResource> seqResources = new ArrayList<>(); + private final List<TsFileResource> unSeqResources = new ArrayList<>(); - @Before - public void setUp() throws MetadataException, IOException, WriteProcessException { - SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_READER_TEST_SG); - } + @Before + public void setUp() throws MetadataException, IOException, WriteProcessException { + SeriesReaderTestUtil.setUp( + measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG); + } - @After - public void tearDown() throws IOException { - SeriesReaderTestUtil.tearDown(seqResources, unSeqResources); - } + @After + public void tearDown() throws IOException { + SeriesReaderTestUtil.tearDown(seqResources, unSeqResources); + } - @Test - public void batchTest() { - try { - MeasurementPath measurementPath = new MeasurementPath(SERIES_READER_TEST_SG + ".device0.sensor0", TSDataType.INT32); - Set<String> allSensors = new HashSet<>(); - allSensors.add("sensor0"); - QueryId queryId = new QueryId("stub_query"); - FragmentInstanceContext fragmentInstanceContext = new FragmentInstanceContext(new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance")); - fragmentInstanceContext.addOperatorContext(1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName()); - QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources); - QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), true); - SeriesScanOperator seriesScanOperator = - new SeriesScanOperator( - measurementPath, - allSensors, - TSDataType.INT32, - fragmentInstanceContext.getOperatorContexts().get(0), - dataSource, - null, - null, - true); - int count = 0; - while (seriesScanOperator.hasNext()) { - TsBlock tsBlock = seriesScanOperator.next(); - assertEquals(1, tsBlock.getValueColumnCount()); - assertTrue(tsBlock.getColumn(0) instanceof IntColumn); - assertEquals(20, tsBlock.getPositionCount()); - for (int i = 0; i < tsBlock.getPositionCount(); i++) { - long expectedTime = i + 20L * count; - assertEquals(expectedTime, tsBlock.getTimeByIndex(i)); - if (expectedTime < 200) { - assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i)); - } else if (expectedTime < 260 - || (expectedTime >= 300 && expectedTime < 380) - || expectedTime >= 400) { - assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i)); - } else { - assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i)); - } - } - count++; - } - } catch (IOException | IllegalPathException e) { - e.printStackTrace(); - fail(); + @Test + public void batchTest() { + try { + MeasurementPath measurementPath = + new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32); + Set<String> allSensors = new HashSet<>(); + allSensors.add("sensor0"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceContext fragmentInstanceContext = + new FragmentInstanceContext( + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance")); + fragmentInstanceContext.addOperatorContext( + 1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName()); + QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources); + QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), true); + SeriesScanOperator seriesScanOperator = + new SeriesScanOperator( + measurementPath, + allSensors, + TSDataType.INT32, + fragmentInstanceContext.getOperatorContexts().get(0), + dataSource, + null, + null, + true); + int count = 0; + while (seriesScanOperator.hasNext()) { + TsBlock tsBlock = seriesScanOperator.next(); + assertEquals(1, tsBlock.getValueColumnCount()); + assertTrue(tsBlock.getColumn(0) instanceof IntColumn); + assertEquals(20, tsBlock.getPositionCount()); + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + long expectedTime = i + 20L * count; + assertEquals(expectedTime, tsBlock.getTimeByIndex(i)); + if (expectedTime < 200) { + assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i)); + } else if (expectedTime < 260 + || (expectedTime >= 300 && expectedTime < 380) + || expectedTime >= 400) { + assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i)); + } else { + assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i)); + } } + count++; + } + assertEquals(25, count); + } catch (IOException | IllegalPathException e) { + e.printStackTrace(); + fail(); } + } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java new file mode 100644 index 0000000..4479d42 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.operator; + +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; +import org.apache.iotdb.db.mpp.common.PlanFragmentId; +import org.apache.iotdb.db.mpp.common.QueryId; +import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; +import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator; +import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy; +import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil; +import org.apache.iotdb.db.utils.QueryUtils; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.IntColumn; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; + +import static org.junit.Assert.*; + +public class TimeJoinOperatorTest { + private static final String TIME_JOIN_OPERATOR_TEST_SG = "root.TimeJoinOperatorTest"; + private final List<String> deviceIds = new ArrayList<>(); + private final List<MeasurementSchema> measurementSchemas = new ArrayList<>(); + + private final List<TsFileResource> seqResources = new ArrayList<>(); + private final List<TsFileResource> unSeqResources = new ArrayList<>(); + + @Before + public void setUp() throws MetadataException, IOException, WriteProcessException { + SeriesReaderTestUtil.setUp( + measurementSchemas, deviceIds, seqResources, unSeqResources, TIME_JOIN_OPERATOR_TEST_SG); + } + + @After + public void tearDown() throws IOException { + SeriesReaderTestUtil.tearDown(seqResources, unSeqResources); + } + + @Test + public void batchTest() { + try { + MeasurementPath measurementPath1 = + new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32); + Set<String> allSensors = new HashSet<>(); + allSensors.add("sensor0"); + allSensors.add("sensor1"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceContext fragmentInstanceContext = + new FragmentInstanceContext( + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance")); + fragmentInstanceContext.addOperatorContext( + 1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName()); + fragmentInstanceContext.addOperatorContext( + 2, new PlanNodeId("2"), SeriesScanOperator.class.getSimpleName()); + fragmentInstanceContext.addOperatorContext( + 3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName()); + QueryDataSource dataSource = new QueryDataSource(seqResources, unSeqResources); + QueryUtils.fillOrderIndexes(dataSource, measurementPath1.getDevice(), true); + SeriesScanOperator seriesScanOperator1 = + new SeriesScanOperator( + measurementPath1, + allSensors, + TSDataType.INT32, + fragmentInstanceContext.getOperatorContexts().get(0), + dataSource, + null, + null, + true); + + MeasurementPath measurementPath2 = + new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32); + SeriesScanOperator seriesScanOperator2 = + new SeriesScanOperator( + measurementPath2, + allSensors, + TSDataType.INT32, + fragmentInstanceContext.getOperatorContexts().get(1), + dataSource, + null, + null, + true); + + TimeJoinOperator timeJoinOperator = + new TimeJoinOperator( + fragmentInstanceContext.getOperatorContexts().get(2), + Arrays.asList(seriesScanOperator1, seriesScanOperator2), + OrderBy.TIMESTAMP_ASC, + 2, + Arrays.asList(TSDataType.INT32, TSDataType.INT32)); + int count = 0; + while (timeJoinOperator.hasNext()) { + TsBlock tsBlock = timeJoinOperator.next(); + assertEquals(2, tsBlock.getValueColumnCount()); + assertTrue(tsBlock.getColumn(0) instanceof IntColumn); + assertTrue(tsBlock.getColumn(1) instanceof IntColumn); + assertEquals(20, tsBlock.getPositionCount()); + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + long expectedTime = i + 20L * count; + assertEquals(expectedTime, tsBlock.getTimeByIndex(i)); + if (expectedTime < 200) { + assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i)); + assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i)); + } else if (expectedTime < 260 + || (expectedTime >= 300 && expectedTime < 380) + || expectedTime >= 400) { + assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i)); + assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i)); + } else { + assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i)); + assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i)); + } + } + count++; + } + assertEquals(25, count); + } catch (IOException | IllegalPathException e) { + e.printStackTrace(); + fail(); + } + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java index b43839f..f992f04 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java @@ -60,7 +60,8 @@ public class SeriesAggregateReaderTest { @Before public void setUp() throws MetadataException, IOException, WriteProcessException { EnvironmentUtils.envSetUp(); - SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG); + SeriesReaderTestUtil.setUp( + measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG); } @After diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java index 166e605..fd8b261 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java @@ -51,7 +51,8 @@ public class SeriesReaderByTimestampTest { @Before public void setUp() throws MetadataException, IOException, WriteProcessException { EnvironmentUtils.envSetUp(); - SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG); + SeriesReaderTestUtil.setUp( + measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG); } @After diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java index ac35ca1..54c97cc 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java @@ -56,7 +56,8 @@ public class SeriesReaderTest { @Before public void setUp() throws MetadataException, IOException, WriteProcessException { - SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG); + SeriesReaderTestUtil.setUp( + measurementSchemas, deviceIds, seqResources, unseqResources, SERIES_READER_TEST_SG); } @After diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java index 2958355..417c53c 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java @@ -86,7 +86,8 @@ public class SeriesReaderTestUtil { List<TsFileResource> seqResources, List<TsFileResource> unseqResources, List<MeasurementSchema> measurementSchemas, - List<String> deviceIds, String sgName) + List<String> deviceIds, + String sgName) throws IOException, WriteProcessException { for (int i = 0; i < seqFileNum; i++) { File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i)); @@ -99,8 +100,7 @@ public class SeriesReaderTestUtil { prepareFile(tsFileResource, i * ptNum, ptNum, 0, measurementSchemas, deviceIds); } for (int i = 0; i < unseqFileNum; i++) { - File file = - new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum)); + File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + seqFileNum)); TsFileResource tsFileResource = new TsFileResource(file); tsFileResource.setStatus(TsFileResourceStatus.CLOSED); tsFileResource.setMinPlanIndex(i + seqFileNum); @@ -116,9 +116,7 @@ public class SeriesReaderTestUtil { deviceIds); } - File file = - new File( - TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum)); + File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + unseqFileNum)); TsFileResource tsFileResource = new TsFileResource(file); tsFileResource.setStatus(TsFileResourceStatus.CLOSED); tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum); @@ -171,7 +169,8 @@ public class SeriesReaderTestUtil { } private static void prepareSeries( - List<MeasurementSchema> measurementSchemas, List<String> deviceIds, String sgName) throws MetadataException { + List<MeasurementSchema> measurementSchemas, List<String> deviceIds, String sgName) + throws MetadataException { for (int i = 0; i < measurementNum; i++) { measurementSchemas.add( new MeasurementSchema(
