This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/aggregate by this push:
     new 41a572f  add groupby test and fix groupBy bug
41a572f is described below

commit 41a572f34cd3f5a843003dbd395b769125c661c3
Author: suyue <[email protected]>
AuthorDate: Thu Mar 21 20:57:01 2019 +0800

    add groupby test and fix groupBy bug
---
 .../iotdb/db/query/executor/GroupByEngine.java     |   6 +-
 .../executor/GroupByWithOnlyTimeFilterDataSet.java |  42 ++++----
 .../executor/GroupByWithValueFilterDataSet.java    |   2 +-
 .../iotdb/db/integration/IOTDBGroupByTestIT.java   | 120 +++++++++++++++++++++
 .../iotdb/db/query/executor/GroupByEngineTest.java |  31 +++++-
 5 files changed, 176 insertions(+), 25 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
index 5430f9e..a4effd6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
@@ -72,8 +72,8 @@ public abstract class GroupByEngine extends QueryDataSet {
     this.endTime = -1;
   }
 
-  protected void initAggreFuction(QueryContext context, List<String> aggres, 
IExpression expression)
-      throws FileNodeManagerException, PathErrorException, ProcessorException, 
IOException {
+  protected void initAggreFuction(List<String> aggres)
+      throws PathErrorException, ProcessorException{
 
     List<TSDataType> types = new ArrayList<>();
     //construct AggregateFunctions
@@ -122,7 +122,7 @@ public abstract class GroupByEngine extends QueryDataSet {
     }
 
     //current interval is not covered yet
-    if (endTime < mergedIntervals.get(usedIndex).right) {
+    if (endTime <= mergedIntervals.get(usedIndex).right) {
       startTime = endTime;
       endTime += unit;
       hasCachedTimeInterval = true;
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
index 67d72d9..922148c 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
@@ -76,7 +76,7 @@ public class GroupByWithOnlyTimeFilterDataSet extends 
GroupByEngine {
    */
   public void initGroupBy(QueryContext context, List<String> aggres, 
IExpression expression)
       throws FileNodeManagerException, PathErrorException, ProcessorException, 
IOException {
-    initAggreFuction(context, aggres, expression);
+    initAggreFuction(aggres);
     //init reader
     QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
     if (expression != null) {
@@ -128,16 +128,14 @@ public class GroupByWithOnlyTimeFilterDataSet extends 
GroupByEngine {
     IPointReader unsequenceReader = unSequenceReaderList.get(idx);
     IAggregateReader sequenceReader = sequenceReaderList.get(idx);
     AggregateFunction function = functions.get(idx);
-    BatchData batchData = batchDataList.get(idx);
-    boolean hasCachedSequenceData = hasCachedSequenceDataList.get(idx);
-
     function.init();
-
     boolean finishCheckSequenceData = false;
 
     //skip the points with timestamp less than startTime
     skipExessData(idx, sequenceReader, unsequenceReader);
 
+    BatchData batchData = batchDataList.get(idx);
+    boolean hasCachedSequenceData = hasCachedSequenceDataList.get(idx);
     //there was unprocessed data in last batch
     if (hasCachedSequenceData && batchData.hasNext()) {
       function.calculateValueFromPageData(batchData, unsequenceReader, 
endTime);
@@ -183,6 +181,7 @@ public class GroupByWithOnlyTimeFilterDataSet extends 
GroupByEngine {
       if (canUseHeader(minTime, maxTime, unsequenceReader, function)) {
         //cal using page header
         function.calculateValueFromPageHeader(pageHeader);
+        sequenceReader.skipPageData();
       } else {
         //cal using page data
         batchData = sequenceReader.nextBatch();
@@ -202,54 +201,59 @@ public class GroupByWithOnlyTimeFilterDataSet extends 
GroupByEngine {
   }
 
   //skip the points with timestamp less than startTime
-  private void skipExessData(int idx, IAggregateReader sequenceReader, 
IPointReader unsequenceReader)
+  private void skipExessData(int idx, IAggregateReader sequenceReader,
+      IPointReader unsequenceReader)
       throws IOException {
     BatchData batchData = batchDataList.get(idx);
     boolean hasCachedSequenceData = hasCachedSequenceDataList.get(idx);
 
     //skip the unsequenceReader points with timestamp less than startTime
-    while (unsequenceReader.hasNext() && 
unsequenceReader.current().getTimestamp() < startTime){
+    while (unsequenceReader.hasNext() && 
unsequenceReader.current().getTimestamp() < startTime) {
       unsequenceReader.next();
     }
 
     //skip the cached batch data points with timestamp less than startTime
-    if(hasCachedSequenceData){
-      while (batchData.hasNext() && batchData.currentTime() < startTime){
+    if (hasCachedSequenceData) {
+      while (batchData.hasNext() && batchData.currentTime() < startTime) {
         batchData.next();
       }
     }
-    if(hasCachedSequenceData && !batchData.hasNext()){
+    if (hasCachedSequenceData && !batchData.hasNext()) {
       hasCachedSequenceData = false;
-    }
-    else {
+    } else {
       return;
     }
 
     //skip the points in sequenceReader data whose timestamp are less than 
startTime
-    while (sequenceReader.hasNext()){
+    while (sequenceReader.hasNext()) {
       PageHeader pageHeader = sequenceReader.nextPageHeader();
       //memory data
-      if(pageHeader == null){
+      if (pageHeader == null) {
         batchData = sequenceReader.nextBatch();
         hasCachedSequenceData = true;
-        while (batchData.hasNext() && batchData.currentTime() < startTime){
+        while (batchData.hasNext() && batchData.currentTime() < startTime) {
           batchData.next();
         }
-        continue;
+        if (batchData.hasNext()) {
+          break;
+        } else {
+          hasCachedSequenceData = false;
+          continue;
+        }
       }
       //timestamps of all points in the page are less than startTime
-      if(pageHeader.getMaxTimestamp() < startTime){
+      if (pageHeader.getMaxTimestamp() < startTime) {
         sequenceReader.skipPageData();
         continue;
       }
       //timestamps of all points in the page are greater or equal to 
startTime, don't need to skip
-      if(pageHeader.getMinTimestamp() >= startTime){
+      if (pageHeader.getMinTimestamp() >= startTime) {
         break;
       }
       //the page has overlap with startTime
       batchData = sequenceReader.nextBatch();
       hasCachedSequenceData = true;
-      while (batchData.hasNext() && batchData.currentTime() < startTime){
+      while (batchData.hasNext() && batchData.currentTime() < startTime) {
         batchData.next();
       }
       break;
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
index 077905d..3fbcdd8 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
@@ -64,7 +64,7 @@ public class GroupByWithValueFilterDataSet extends 
GroupByEngine {
    */
   public void initGroupBy(QueryContext context, List<String> aggres, 
IExpression expression)
       throws FileNodeManagerException, PathErrorException, ProcessorException, 
IOException {
-    initAggreFuction(context, aggres, expression);
+    initAggreFuction(aggres);
 
     QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId, 
expression);
     QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
index dc2fd4c..bf2f755 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
@@ -326,6 +326,126 @@ public class IOTDBGroupByTestIT {
     }
   }
 
+  @Test
+  public void largeIntervalTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "2,4.4,4,20,4",
+        "30,30.3,16,610,30",
+        "620,500.5,1,620,620"
+    };
+    String[] retArray2 = new String[]{
+        "2,3.3,5,20,3",
+        "30,30.3,16,610,30",
+        "620,500.5,1,620,620"
+    };
+
+    Connection connection = null;
+    try {
+      connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select min_value(temperature), 
count(temperature), max_time(temperature), min_time(temperature) from 
root.ln.wf01.wt01 where time > 3 "
+          + "GROUP BY (590ms, 30, [2, 30], [30, 120], [100, 120], [123, 125], 
[155, 550], [540, 680])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(count("root.ln.wf01.wt01.temperature"))+ "," + 
resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select min_value(temperature), count 
(temperature), max_time(temperature), min_time(temperature) from 
root.ln.wf01.wt01 where temperature > 3 "
+          + "GROUP BY (590ms, 30, [2, 30], [30, 120], [100, 120], [123, 125], 
[155, 550],[540, 680])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(count("root.ln.wf01.wt01.temperature"))+ "," + 
resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray2[cnt], ans);
+        cnt++;
+        //System.out.println(ans);
+      }
+      Assert.assertEquals(retArray2.length, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void smallPartitionTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "50,100.1,50.5,150.6",
+        "615,500.5,500.5,500.5"
+
+    };
+    String[] retArray2 = new String[]{
+        "50,100.1,50.5,150.6",
+        "585,null,null,0.0",
+        "590,500.5,200.2,700.7"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select last(temperature), 
first(temperature), sum(temperature) from root.ln.wf01.wt01 where time > 3 "
+          + "GROUP BY (80ms, 30,[50,100], [615, 650])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(last("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(first("root.ln.wf01.wt01.temperature"))+ "," + 
resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
+        System.out.println(ans);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select first(temperature), 
last(temperature), sum(temperature) from root.ln.wf01.wt01 where temperature > 
3 "
+          + "GROUP BY (80ms, 30,[50,100], [585,590], [615, 650])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(last("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(first("root.ln.wf01.wt01.temperature"))+ "," + 
resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
+        System.out.println(ans);
+        Assert.assertEquals(retArray2[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray2.length, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
   public void prepareData() throws SQLException {
     Connection connection = null;
     try {
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java 
b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
index ee2488b..ed00b16 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
@@ -46,6 +46,7 @@ public class GroupByEngineTest {
       Assert.assertEquals(endTimeArray[cnt], pair.right);
       cnt++;
     }
+    Assert.assertEquals(startTimeArray.length, cnt);
   }
 
   @Test
@@ -70,6 +71,7 @@ public class GroupByEngineTest {
       Assert.assertEquals(endTimeArray[cnt], pair.right);
       cnt++;
     }
+    Assert.assertEquals(startTimeArray.length, cnt);
   }
 
   @Test
@@ -83,8 +85,8 @@ public class GroupByEngineTest {
     pairList.add(new Pair<>(858L,860L));
     pairList.add(new Pair<>(1200L,1220L));
 
-    long[] startTimeArray = {805, 820, 850, 1200, 1210};
-    long[] endTimeArray = {820, 840, 860, 1210, 1230};
+    long[] startTimeArray = {805, 820, 850, 860, 1200, 1220};
+    long[] endTimeArray = {820, 840, 860, 880, 1220, 1240};
     GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId, 
null, unit, startTimePoint, pairList);
     int cnt = 0;
     while (groupByEngine.hasNext()){
@@ -94,6 +96,7 @@ public class GroupByEngineTest {
       Assert.assertEquals(endTimeArray[cnt], pair.right);
       cnt++;
     }
+    Assert.assertEquals(startTimeArray.length, cnt);
   }
 
   @Test
@@ -118,7 +121,31 @@ public class GroupByEngineTest {
       Assert.assertEquals(endTimeArray[cnt], pair.right);
       cnt++;
     }
+    Assert.assertEquals(startTimeArray.length, cnt);
   }
 
+//(80ms, 30,[50,100], [585,590], [615, 650])
+  @Test
+  public void test5() {
+    long jobId = 1000L;
+    long unit = 80;
+    long startTimePoint = 30;
+    List<Pair<Long, Long>> pairList = new ArrayList<>();
+    pairList.add(new Pair<>(50L,100L));
+    pairList.add(new Pair<>(585L,590L));
+    pairList.add(new Pair<>(615L,650L));
 
+    long[] startTimeArray = {50, 585, 590};
+    long[] endTimeArray = {110, 590, 670};
+    GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId, 
null, unit, startTimePoint, pairList);
+    int cnt = 0;
+    while (groupByEngine.hasNext()){
+      Pair pair = groupByEngine.nextTimePartition();
+      Assert.assertTrue(cnt < startTimeArray.length);
+      Assert.assertEquals(startTimeArray[cnt], pair.left);
+      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      cnt++;
+    }
+    Assert.assertEquals(startTimeArray.length, cnt);
+  }
 }
\ No newline at end of file

Reply via email to