This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp-2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bbca5f2d06af2cb4a765eeaaeae74dcdc48efed2
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Mar 28 20:35:22 2022 +0800

    add UT for TimeJoinOperator
---
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   1 -
 .../db/mpp/operator/SeriesScanOperatorTest.java    | 124 +++++++++--------
 .../db/mpp/operator/TimeJoinOperatorTest.java      | 152 +++++++++++++++++++++
 .../reader/series/SeriesAggregateReaderTest.java   |   3 +-
 .../reader/series/SeriesReaderByTimestampTest.java |   3 +-
 .../db/query/reader/series/SeriesReaderTest.java   |   3 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |  13 +-
 7 files changed, 229 insertions(+), 70 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 8566608..a069c1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -37,7 +37,6 @@ import 
org.apache.iotdb.db.exception.TriggerManagementException;
 import org.apache.iotdb.db.exception.UDFRegistrationException;
 import org.apache.iotdb.db.metadata.idtable.IDTableManager;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 5891566..1a4b257 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -37,6 +36,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,68 +50,74 @@ import java.util.Set;
 import static org.junit.Assert.*;
 
 public class SeriesScanOperatorTest {
-    private static final String SERIES_READER_TEST_SG = 
"root.seriesScanOperatorTest";
-    private final List<String> deviceIds = new ArrayList<>();
-    private final List<MeasurementSchema> measurementSchemas = new 
ArrayList<>();
+  private static final String SERIES_SCAN_OPERATOR_TEST_SG = 
"root.seriesScanOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
 
-    private final List<TsFileResource> seqResources = new ArrayList<>();
-    private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
 
-    @Before
-    public void setUp() throws MetadataException, IOException, 
WriteProcessException {
-        SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, 
seqResources, unSeqResources, SERIES_READER_TEST_SG);
-    }
+  @Before
+  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, 
SERIES_SCAN_OPERATOR_TEST_SG);
+  }
 
-    @After
-    public void tearDown() throws IOException {
-        SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
-    }
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
 
-    @Test
-    public void batchTest() {
-        try {
-            MeasurementPath measurementPath = new 
MeasurementPath(SERIES_READER_TEST_SG + ".device0.sensor0", TSDataType.INT32);
-            Set<String> allSensors = new HashSet<>();
-            allSensors.add("sensor0");
-            QueryId queryId = new QueryId("stub_query");
-            FragmentInstanceContext fragmentInstanceContext = new 
FragmentInstanceContext(new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance"));
-            fragmentInstanceContext.addOperatorContext(1, new PlanNodeId("1"), 
SeriesScanOperator.class.getSimpleName());
-            QueryDataSource dataSource = new QueryDataSource(seqResources, 
unSeqResources);
-            QueryUtils.fillOrderIndexes(dataSource, 
measurementPath.getDevice(), true);
-            SeriesScanOperator seriesScanOperator =
-                    new SeriesScanOperator(
-                            measurementPath,
-                            allSensors,
-                            TSDataType.INT32,
-                            
fragmentInstanceContext.getOperatorContexts().get(0),
-                            dataSource,
-                            null,
-                            null,
-                            true);
-            int count = 0;
-            while (seriesScanOperator.hasNext()) {
-                TsBlock tsBlock = seriesScanOperator.next();
-                assertEquals(1, tsBlock.getValueColumnCount());
-                assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
-                assertEquals(20, tsBlock.getPositionCount());
-                for (int i = 0; i < tsBlock.getPositionCount(); i++) {
-                    long expectedTime = i + 20L * count;
-                    assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
-                    if (expectedTime < 200) {
-                        assertEquals(20000 + expectedTime, 
tsBlock.getColumn(0).getInt(i));
-                    } else if (expectedTime < 260
-                            || (expectedTime >= 300 && expectedTime < 380)
-                            || expectedTime >= 400) {
-                        assertEquals(10000 + expectedTime, 
tsBlock.getColumn(0).getInt(i));
-                    } else {
-                        assertEquals(expectedTime, 
tsBlock.getColumn(0).getInt(i));
-                    }
-                }
-                count++;
-            }
-        } catch (IOException | IllegalPathException e) {
-            e.printStackTrace();
-            fail();
+  @Test
+  public void batchTest() {
+    try {
+      MeasurementPath measurementPath =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + 
".device0.sensor0", TSDataType.INT32);
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceContext fragmentInstanceContext =
+          new FragmentInstanceContext(
+              new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance"));
+      fragmentInstanceContext.addOperatorContext(
+          1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
+      QueryDataSource dataSource = new QueryDataSource(seqResources, 
unSeqResources);
+      QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), 
true);
+      SeriesScanOperator seriesScanOperator =
+          new SeriesScanOperator(
+              measurementPath,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              dataSource,
+              null,
+              null,
+              true);
+      int count = 0;
+      while (seriesScanOperator.hasNext()) {
+        TsBlock tsBlock = seriesScanOperator.next();
+        assertEquals(1, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * count;
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          if (expectedTime < 200) {
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+          } else {
+            assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
+          }
         }
+        count++;
+      }
+      assertEquals(25, count);
+    } catch (IOException | IllegalPathException e) {
+      e.printStackTrace();
+      fail();
     }
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
new file mode 100644
index 0000000..4479d42
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+public class TimeJoinOperatorTest {
+  private static final String TIME_JOIN_OPERATOR_TEST_SG = 
"root.TimeJoinOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  @Before
+  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, 
TIME_JOIN_OPERATOR_TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  @Test
+  public void batchTest() {
+    try {
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      allSensors.add("sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceContext fragmentInstanceContext =
+          new FragmentInstanceContext(
+              new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance"));
+      fragmentInstanceContext.addOperatorContext(
+          1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          2, new PlanNodeId("2"), SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName());
+      QueryDataSource dataSource = new QueryDataSource(seqResources, 
unSeqResources);
+      QueryUtils.fillOrderIndexes(dataSource, measurementPath1.getDevice(), 
true);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              measurementPath1,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              dataSource,
+              null,
+              null,
+              true);
+
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", 
TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              measurementPath2,
+              allSensors,
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              dataSource,
+              null,
+              null,
+              true);
+
+      TimeJoinOperator timeJoinOperator =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+              OrderBy.TIMESTAMP_ASC,
+              2,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32));
+      int count = 0;
+      while (timeJoinOperator.hasNext()) {
+        TsBlock tsBlock = timeJoinOperator.next();
+        assertEquals(2, tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * count;
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          if (expectedTime < 200) {
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i));
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+          } else {
+            assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i));
+            assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+          }
+        }
+        count++;
+      }
+      assertEquals(25, count);
+    } catch (IOException | IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index b43839f..f992f04 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -60,7 +60,8 @@ public class SeriesAggregateReaderTest {
   @Before
   public void setUp() throws MetadataException, IOException, 
WriteProcessException {
     EnvironmentUtils.envSetUp();
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, 
unseqResources, SERIES_READER_TEST_SG);
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unseqResources, 
SERIES_READER_TEST_SG);
   }
 
   @After
diff --git 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index 166e605..fd8b261 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -51,7 +51,8 @@ public class SeriesReaderByTimestampTest {
   @Before
   public void setUp() throws MetadataException, IOException, 
WriteProcessException {
     EnvironmentUtils.envSetUp();
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, 
unseqResources, SERIES_READER_TEST_SG);
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unseqResources, 
SERIES_READER_TEST_SG);
   }
 
   @After
diff --git 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
index ac35ca1..54c97cc 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTest.java
@@ -56,7 +56,8 @@ public class SeriesReaderTest {
 
   @Before
   public void setUp() throws MetadataException, IOException, 
WriteProcessException {
-    SeriesReaderTestUtil.setUp(measurementSchemas, deviceIds, seqResources, 
unseqResources, SERIES_READER_TEST_SG);
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unseqResources, 
SERIES_READER_TEST_SG);
   }
 
   @After
diff --git 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index 2958355..417c53c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -86,7 +86,8 @@ public class SeriesReaderTestUtil {
       List<TsFileResource> seqResources,
       List<TsFileResource> unseqResources,
       List<MeasurementSchema> measurementSchemas,
-      List<String> deviceIds, String sgName)
+      List<String> deviceIds,
+      String sgName)
       throws IOException, WriteProcessException {
     for (int i = 0; i < seqFileNum; i++) {
       File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i));
@@ -99,8 +100,7 @@ public class SeriesReaderTestUtil {
       prepareFile(tsFileResource, i * ptNum, ptNum, 0, measurementSchemas, 
deviceIds);
     }
     for (int i = 0; i < unseqFileNum; i++) {
-      File file =
-          new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + 
seqFileNum));
+      File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, i + 
seqFileNum));
       TsFileResource tsFileResource = new TsFileResource(file);
       tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
       tsFileResource.setMinPlanIndex(i + seqFileNum);
@@ -116,9 +116,7 @@ public class SeriesReaderTestUtil {
           deviceIds);
     }
 
-    File file =
-        new File(
-            TestConstant.getTestTsFilePath(sgName, 0, 0, seqFileNum + 
unseqFileNum));
+    File file = new File(TestConstant.getTestTsFilePath(sgName, 0, 0, 
seqFileNum + unseqFileNum));
     TsFileResource tsFileResource = new TsFileResource(file);
     tsFileResource.setStatus(TsFileResourceStatus.CLOSED);
     tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
@@ -171,7 +169,8 @@ public class SeriesReaderTestUtil {
   }
 
   private static void prepareSeries(
-      List<MeasurementSchema> measurementSchemas, List<String> deviceIds, 
String sgName) throws MetadataException {
+      List<MeasurementSchema> measurementSchemas, List<String> deviceIds, 
String sgName)
+      throws MetadataException {
     for (int i = 0; i < measurementNum; i++) {
       measurementSchemas.add(
           new MeasurementSchema(

Reply via email to