This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/aggregate by this push:
new 41a572f add groupby test and fix groupBy bug
41a572f is described below
commit 41a572f34cd3f5a843003dbd395b769125c661c3
Author: suyue <[email protected]>
AuthorDate: Thu Mar 21 20:57:01 2019 +0800
add groupby test and fix groupBy bug
---
.../iotdb/db/query/executor/GroupByEngine.java | 6 +-
.../executor/GroupByWithOnlyTimeFilterDataSet.java | 42 ++++----
.../executor/GroupByWithValueFilterDataSet.java | 2 +-
.../iotdb/db/integration/IOTDBGroupByTestIT.java | 120 +++++++++++++++++++++
.../iotdb/db/query/executor/GroupByEngineTest.java | 31 +++++-
5 files changed, 176 insertions(+), 25 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
index 5430f9e..a4effd6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
@@ -72,8 +72,8 @@ public abstract class GroupByEngine extends QueryDataSet {
this.endTime = -1;
}
- protected void initAggreFuction(QueryContext context, List<String> aggres,
IExpression expression)
- throws FileNodeManagerException, PathErrorException, ProcessorException,
IOException {
+ protected void initAggreFuction(List<String> aggres)
+ throws PathErrorException, ProcessorException{
List<TSDataType> types = new ArrayList<>();
//construct AggregateFunctions
@@ -122,7 +122,7 @@ public abstract class GroupByEngine extends QueryDataSet {
}
//current interval is not covered yet
- if (endTime < mergedIntervals.get(usedIndex).right) {
+ if (endTime <= mergedIntervals.get(usedIndex).right) {
startTime = endTime;
endTime += unit;
hasCachedTimeInterval = true;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
index 67d72d9..922148c 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
@@ -76,7 +76,7 @@ public class GroupByWithOnlyTimeFilterDataSet extends
GroupByEngine {
*/
public void initGroupBy(QueryContext context, List<String> aggres,
IExpression expression)
throws FileNodeManagerException, PathErrorException, ProcessorException,
IOException {
- initAggreFuction(context, aggres, expression);
+ initAggreFuction(aggres);
//init reader
QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId,
selectedSeries);
if (expression != null) {
@@ -128,16 +128,14 @@ public class GroupByWithOnlyTimeFilterDataSet extends
GroupByEngine {
IPointReader unsequenceReader = unSequenceReaderList.get(idx);
IAggregateReader sequenceReader = sequenceReaderList.get(idx);
AggregateFunction function = functions.get(idx);
- BatchData batchData = batchDataList.get(idx);
- boolean hasCachedSequenceData = hasCachedSequenceDataList.get(idx);
-
function.init();
-
boolean finishCheckSequenceData = false;
//skip the points with timestamp less than startTime
skipExessData(idx, sequenceReader, unsequenceReader);
+ BatchData batchData = batchDataList.get(idx);
+ boolean hasCachedSequenceData = hasCachedSequenceDataList.get(idx);
//there was unprocessed data in last batch
if (hasCachedSequenceData && batchData.hasNext()) {
function.calculateValueFromPageData(batchData, unsequenceReader,
endTime);
@@ -183,6 +181,7 @@ public class GroupByWithOnlyTimeFilterDataSet extends
GroupByEngine {
if (canUseHeader(minTime, maxTime, unsequenceReader, function)) {
//cal using page header
function.calculateValueFromPageHeader(pageHeader);
+ sequenceReader.skipPageData();
} else {
//cal using page data
batchData = sequenceReader.nextBatch();
@@ -202,54 +201,59 @@ public class GroupByWithOnlyTimeFilterDataSet extends
GroupByEngine {
}
//skip the points with timestamp less than startTime
- private void skipExessData(int idx, IAggregateReader sequenceReader,
IPointReader unsequenceReader)
+ private void skipExessData(int idx, IAggregateReader sequenceReader,
+ IPointReader unsequenceReader)
throws IOException {
BatchData batchData = batchDataList.get(idx);
boolean hasCachedSequenceData = hasCachedSequenceDataList.get(idx);
//skip the unsequenceReader points with timestamp less than startTime
- while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < startTime){
+ while (unsequenceReader.hasNext() &&
unsequenceReader.current().getTimestamp() < startTime) {
unsequenceReader.next();
}
//skip the cached batch data points with timestamp less than startTime
- if(hasCachedSequenceData){
- while (batchData.hasNext() && batchData.currentTime() < startTime){
+ if (hasCachedSequenceData) {
+ while (batchData.hasNext() && batchData.currentTime() < startTime) {
batchData.next();
}
}
- if(hasCachedSequenceData && !batchData.hasNext()){
+ if (hasCachedSequenceData && !batchData.hasNext()) {
hasCachedSequenceData = false;
- }
- else {
+ } else {
return;
}
//skip the points in sequenceReader data whose timestamp are less than
startTime
- while (sequenceReader.hasNext()){
+ while (sequenceReader.hasNext()) {
PageHeader pageHeader = sequenceReader.nextPageHeader();
//memory data
- if(pageHeader == null){
+ if (pageHeader == null) {
batchData = sequenceReader.nextBatch();
hasCachedSequenceData = true;
- while (batchData.hasNext() && batchData.currentTime() < startTime){
+ while (batchData.hasNext() && batchData.currentTime() < startTime) {
batchData.next();
}
- continue;
+ if (batchData.hasNext()) {
+ break;
+ } else {
+ hasCachedSequenceData = false;
+ continue;
+ }
}
//timestamps of all points in the page are less than startTime
- if(pageHeader.getMaxTimestamp() < startTime){
+ if (pageHeader.getMaxTimestamp() < startTime) {
sequenceReader.skipPageData();
continue;
}
//timestamps of all points in the page are greater or equal to
startTime, don't need to skip
- if(pageHeader.getMinTimestamp() >= startTime){
+ if (pageHeader.getMinTimestamp() >= startTime) {
break;
}
//the page has overlap with startTime
batchData = sequenceReader.nextBatch();
hasCachedSequenceData = true;
- while (batchData.hasNext() && batchData.currentTime() < startTime){
+ while (batchData.hasNext() && batchData.currentTime() < startTime) {
batchData.next();
}
break;
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
index 077905d..3fbcdd8 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
@@ -64,7 +64,7 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngine {
*/
public void initGroupBy(QueryContext context, List<String> aggres,
IExpression expression)
throws FileNodeManagerException, PathErrorException, ProcessorException,
IOException {
- initAggreFuction(context, aggres, expression);
+ initAggreFuction(aggres);
QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId,
expression);
QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId,
selectedSeries);
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
index dc2fd4c..bf2f755 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
@@ -326,6 +326,126 @@ public class IOTDBGroupByTestIT {
}
}
+ @Test
+ public void largeIntervalTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "2,4.4,4,20,4",
+ "30,30.3,16,610,30",
+ "620,500.5,1,620,620"
+ };
+ String[] retArray2 = new String[]{
+ "2,3.3,5,20,3",
+ "30,30.3,16,610,30",
+ "620,500.5,1,620,620"
+ };
+
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select min_value(temperature),
count(temperature), max_time(temperature), min_time(temperature) from
root.ln.wf01.wt01 where time > 3 "
+ + "GROUP BY (590ms, 30, [2, 30], [30, 120], [100, 120], [123, 125],
[155, 550], [540, 680])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(count("root.ln.wf01.wt01.temperature"))+ "," +
resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select min_value(temperature), count
(temperature), max_time(temperature), min_time(temperature) from
root.ln.wf01.wt01 where temperature > 3 "
+ + "GROUP BY (590ms, 30, [2, 30], [30, 120], [100, 120], [123, 125],
[155, 550],[540, 680])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(count("root.ln.wf01.wt01.temperature"))+ "," +
resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+ Assert.assertEquals(retArray2[cnt], ans);
+ cnt++;
+ //System.out.println(ans);
+ }
+ Assert.assertEquals(retArray2.length, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void smallPartitionTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "50,100.1,50.5,150.6",
+ "615,500.5,500.5,500.5"
+
+ };
+ String[] retArray2 = new String[]{
+ "50,100.1,50.5,150.6",
+ "585,null,null,0.0",
+ "590,500.5,200.2,700.7"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select last(temperature),
first(temperature), sum(temperature) from root.ln.wf01.wt01 where time > 3 "
+ + "GROUP BY (80ms, 30,[50,100], [615, 650])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(last("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(first("root.ln.wf01.wt01.temperature"))+ "," +
resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
+ System.out.println(ans);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select first(temperature),
last(temperature), sum(temperature) from root.ln.wf01.wt01 where temperature >
3 "
+ + "GROUP BY (80ms, 30,[50,100], [585,590], [615, 650])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(last("root.ln.wf01.wt01.temperature"))
+ + "," +
resultSet.getString(first("root.ln.wf01.wt01.temperature"))+ "," +
resultSet.getString(sum("root.ln.wf01.wt01.temperature"));
+ System.out.println(ans);
+ Assert.assertEquals(retArray2[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray2.length, cnt);
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
public void prepareData() throws SQLException {
Connection connection = null;
try {
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
index ee2488b..ed00b16 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
@@ -46,6 +46,7 @@ public class GroupByEngineTest {
Assert.assertEquals(endTimeArray[cnt], pair.right);
cnt++;
}
+ Assert.assertEquals(startTimeArray.length, cnt);
}
@Test
@@ -70,6 +71,7 @@ public class GroupByEngineTest {
Assert.assertEquals(endTimeArray[cnt], pair.right);
cnt++;
}
+ Assert.assertEquals(startTimeArray.length, cnt);
}
@Test
@@ -83,8 +85,8 @@ public class GroupByEngineTest {
pairList.add(new Pair<>(858L,860L));
pairList.add(new Pair<>(1200L,1220L));
- long[] startTimeArray = {805, 820, 850, 1200, 1210};
- long[] endTimeArray = {820, 840, 860, 1210, 1230};
+ long[] startTimeArray = {805, 820, 850, 860, 1200, 1220};
+ long[] endTimeArray = {820, 840, 860, 880, 1220, 1240};
GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId,
null, unit, startTimePoint, pairList);
int cnt = 0;
while (groupByEngine.hasNext()){
@@ -94,6 +96,7 @@ public class GroupByEngineTest {
Assert.assertEquals(endTimeArray[cnt], pair.right);
cnt++;
}
+ Assert.assertEquals(startTimeArray.length, cnt);
}
@Test
@@ -118,7 +121,31 @@ public class GroupByEngineTest {
Assert.assertEquals(endTimeArray[cnt], pair.right);
cnt++;
}
+ Assert.assertEquals(startTimeArray.length, cnt);
}
+//(80ms, 30,[50,100], [585,590], [615, 650])
+ @Test
+ public void test5() {
+ long jobId = 1000L;
+ long unit = 80;
+ long startTimePoint = 30;
+ List<Pair<Long, Long>> pairList = new ArrayList<>();
+ pairList.add(new Pair<>(50L,100L));
+ pairList.add(new Pair<>(585L,590L));
+ pairList.add(new Pair<>(615L,650L));
+ long[] startTimeArray = {50, 585, 590};
+ long[] endTimeArray = {110, 590, 670};
+ GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId,
null, unit, startTimePoint, pairList);
+ int cnt = 0;
+ while (groupByEngine.hasNext()){
+ Pair pair = groupByEngine.nextTimePartition();
+ Assert.assertTrue(cnt < startTimeArray.length);
+ Assert.assertEquals(startTimeArray[cnt], pair.left);
+ Assert.assertEquals(endTimeArray[cnt], pair.right);
+ cnt++;
+ }
+ Assert.assertEquals(startTimeArray.length, cnt);
+ }
}
\ No newline at end of file