This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/aggregate by this push:
new 3893473 add aggregate function with value filter
3893473 is described below
commit 3893473fa074c890db7771d45d4a9cbafaff8dbe
Author: suyue <[email protected]>
AuthorDate: Tue Mar 12 21:45:11 2019 +0800
add aggregate function with value filter
---
.../db/query/aggregation/AggreFuncFactory.java | 4 +-
.../db/query/aggregation/AggregateFunction.java | 12 +-
.../db/query/aggregation/impl/CountAggrFunc.java | 20 +-
.../db/query/aggregation/impl/FirstAggrFunc.java | 29 ++-
.../db/query/aggregation/impl/LastAggrFunc.java | 71 +++---
.../db/query/aggregation/impl/MaxTimeAggrFunc.java | 33 ++-
.../query/aggregation/impl/MaxValueAggrFunc.java | 25 +-
.../db/query/aggregation/impl/MeanAggrFunc.java | 35 +--
.../db/query/aggregation/impl/MinTimeAggrFunc.java | 20 +-
.../query/aggregation/impl/MinValueAggrFunc.java | 28 +-
.../db/query/aggregation/impl/SumAggrFunc.java | 32 +--
.../db/query/executor/AggregateEngineExecutor.java | 109 +++++++-
iotdb/src/test/java/PerformanceTest.java | 282 ---------------------
.../IoTDBAggregationLargeDataTestIT.java | 46 ++--
.../IoTDBAggregationSmallDataTestIT.java | 20 +-
.../db/integration/IoTDBAggregationTestIT.java | 19 ++
16 files changed, 354 insertions(+), 431 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
index 882993d..a1db207 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
@@ -61,11 +61,11 @@ public class AggreFuncFactory {
case AggregationConstant.COUNT:
return new CountAggrFunc();
case AggregationConstant.MEAN:
- return new MeanAggrFunc();
+ return new MeanAggrFunc(dataType);
case AggregationConstant.FIRST:
return new FirstAggrFunc(dataType);
case AggregationConstant.SUM:
- return new SumAggrFunc();
+ return new SumAggrFunc(dataType);
case AggregationConstant.LAST:
return new LastAggrFunc(dataType);
default:
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
index 34d2d75..fe102ef 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.query.aggregation;
import java.io.IOException;
+import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
@@ -36,6 +37,12 @@ public abstract class AggregateFunction {
public TSDataType dataType;
public boolean hasSetValue;
+ /**
+ * construct.
+ *
+ * @param name aggregate function name.
+ * @param dataType series data type.
+ */
public AggregateFunction(String name, TSDataType dataType) {
this.name = name;
this.dataType = dataType;
@@ -83,9 +90,8 @@ public abstract class AggregateFunction {
* @throws IOException TsFile data read error
* @throws ProcessorException wrong aggregation method parameter
*/
- public abstract boolean calcAggregationUsingTimestamps(EngineTimeGenerator
timeGenerator,
- EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp
unsequenceReader)
- throws IOException, ProcessorException;
+ public abstract void calcAggregationUsingTimestamps(List<Long> timestamps,
+ EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException;
/**
* <p>
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index fb40cb8..9c70142 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -20,12 +20,13 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
+import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -98,10 +99,19 @@ public class CountAggrFunc extends AggregateFunction {
}
@Override
- public boolean calcAggregationUsingTimestamps(EngineTimeGenerator
timeGenerator,
- EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp
unsequenceReader)
- throws IOException, ProcessorException {
- return false;
+ public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ int cnt = 0;
+ for (long time : timestamps) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ if (value != null) {
+ cnt++;
+ }
+ }
+
+ long preValue = resultData.getLong();
+ preValue += cnt;
+ resultData.setLong(0, preValue);
}
@Override
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
index 1b33a6f..fa6e717 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
@@ -20,12 +20,13 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
+import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -68,19 +69,19 @@ public class FirstAggrFunc extends AggregateFunction {
}
if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
if (dataInThisPage.currentTime() >=
unsequenceReader.current().getTimestamp()) {
- resultData.putTime( 0);
+ resultData.putTime(0);
resultData.putAnObject(unsequenceReader.current().getValue().getValue());
unsequenceReader.next();
return;
} else {
- resultData.putTime( 0);
+ resultData.putTime(0);
resultData.putAnObject(dataInThisPage.currentValue());
return;
}
}
if (dataInThisPage.hasNext()) {
- resultData.putTime( 0);
+ resultData.putTime(0);
resultData.putAnObject(dataInThisPage.currentValue());
return;
}
@@ -93,17 +94,27 @@ public class FirstAggrFunc extends AggregateFunction {
return;
}
if (unsequenceReader.hasNext()) {
- resultData.putTime( 0);
+ resultData.putTime(0);
resultData.putAnObject(unsequenceReader.current().getValue().getValue());
return;
}
}
@Override
- public boolean calcAggregationUsingTimestamps(EngineTimeGenerator
timeGenerator,
- EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp
unsequenceReader)
- throws IOException, ProcessorException {
- return false;
+ public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ if (resultData.length() != 0) {
+ return;
+ }
+
+ for (long time : timestamps) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ if (value != null) {
+ resultData.putTime(0);
+ resultData.putAnObject(value.getValue());
+ break;
+ }
+ }
}
@Override
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
index e2d53d4..71142a3 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
@@ -20,13 +20,14 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
+import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -53,14 +54,7 @@ public class LastAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromPageHeader(PageHeader pageHeader) throws
ProcessorException {
Object lastVal = pageHeader.getStatistics().getLast();
- if (resultData.length() == 0) {
- resultData.putTime(pageHeader.getMaxTimestamp());
- resultData.putAnObject(lastVal);
- } else {
- if (resultData.currentTime() < pageHeader.getMaxTimestamp()) {
- resultData.setAnObject(0, (Comparable<?>) lastVal);
- }
- }
+ updateLastResult(pageHeader.getMaxTimestamp(), lastVal);
}
@Override
@@ -85,17 +79,9 @@ public class LastAggrFunc extends AggregateFunction {
}
}
- if (resultData.length() == 0) {
- if (time != -1) {
- resultData.putTime(time);
- resultData.putAnObject(lastVal);
- }
- } else {
- //has set value
- if (time != -1 && time > resultData.currentTime()) {
- resultData.setTime(0, time);
- resultData.setAnObject(0, (Comparable<?>) lastVal);
- }
+ //has inited lastVal and time in the batch(dataInThisPage).
+ if (time != -1) {
+ updateLastResult(time, lastVal);
}
}
@@ -106,25 +92,29 @@ public class LastAggrFunc extends AggregateFunction {
while (unsequenceReader.hasNext()) {
pair = unsequenceReader.next();
}
- if (resultData.length() == 0) {
- if (pair != null) {
- resultData.putAnObject(pair.getValue().getValue());
- resultData.putTime( pair.getTimestamp());
- }
- } else {
- if (pair != null && pair.getTimestamp() >= resultData.currentTime()) {
- resultData.setAnObject(0, (Comparable<?>) pair.getValue().getValue());
- resultData.setTime(0, pair.getTimestamp());
- }
+
+ if (pair != null) {
+ updateLastResult(pair.getTimestamp(), pair.getValue().getValue());
}
}
@Override
- public boolean calcAggregationUsingTimestamps(EngineTimeGenerator
timeGenerator,
- EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp
unsequenceReader)
- throws IOException, ProcessorException {
- return false;
+ public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+
+ long time = -1;
+ Object lastVal = null;
+ for (int i = 0; i < timestamps.size(); i++) {
+ TsPrimitiveType value =
dataReader.getValueInTimestamp(timestamps.get(i));
+ if (value != null) {
+ time = timestamps.get(i);
+ lastVal = value.getValue();
+ }
+ }
+ if(time != -1){
+ updateLastResult(-1, lastVal);
+ }
}
@Override
@@ -132,4 +122,17 @@ public class LastAggrFunc extends AggregateFunction {
long intervalEnd, BatchData data) throws ProcessorException {
}
+
+ private void updateLastResult(long time, Object value) {
+ if (resultData.length() == 0) {
+ resultData.putAnObject(value);
+ resultData.putTime(time);
+ } else {
+ if (time >= resultData.currentTime()) {
+ resultData.setAnObject(0, (Comparable<?>) value);
+ resultData.setTime(0, time);
+ }
+ }
+ }
+
}
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
index c6773db..cd09d5d 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
@@ -20,13 +20,14 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
+import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -81,7 +82,7 @@ public class MaxTimeAggrFunc extends AggregateFunction {
}
if (resultData.length() == 0) {
if (time != -1) {
- resultData.putTime( 0);
+ resultData.putTime(0);
resultData.putAnObject(time);
}
} else {
@@ -102,7 +103,7 @@ public class MaxTimeAggrFunc extends AggregateFunction {
if (resultData.length() == 0) {
if (pair != null) {
resultData.putTime(0);
- resultData.putAnObject( pair.getTimestamp());
+ resultData.putAnObject(pair.getTimestamp());
}
} else {
//has set value
@@ -113,10 +114,28 @@ public class MaxTimeAggrFunc extends AggregateFunction {
}
@Override
- public boolean calcAggregationUsingTimestamps(EngineTimeGenerator
timeGenerator,
- EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp
unsequenceReader)
- throws IOException, ProcessorException {
- return false;
+ public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ long time = -1;
+ for (int i = 0; i < timestamps.size(); i++) {
+ TsPrimitiveType value =
dataReader.getValueInTimestamp(timestamps.get(i));
+ if (value != null) {
+ time = timestamps.get(i);
+ }
+ }
+
+ if(time == -1){
+ return;
+ }
+
+ if (resultData.length() == 0) {
+ resultData.putTime(0);
+ resultData.putLong(time);
+ } else {
+ if(resultData.getLong() < time){
+ resultData.setLong(0, time);
+ }
+ }
}
@Override
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
index 8680592..6973485 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
@@ -20,12 +20,14 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
+import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -108,6 +110,22 @@ public class MaxValueAggrFunc extends AggregateFunction {
updateResult(maxVal);
}
+ @Override
+ public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ Comparable<Object> maxVal = null;
+ for (long time : timestamps){
+ TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ if(value == null){
+ continue;
+ }
+ if(maxVal == null || maxVal.compareTo(value.getValue())<0){
+ maxVal = (Comparable<Object>) value.getValue();
+ }
+ }
+ updateResult(maxVal);
+ }
+
private void updateResult(Comparable<Object> maxVal) {
if (resultData.length() == 0) {
if (maxVal != null) {
@@ -122,13 +140,6 @@ public class MaxValueAggrFunc extends AggregateFunction {
}
@Override
- public boolean calcAggregationUsingTimestamps(EngineTimeGenerator
timeGenerator,
- EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp
unsequenceReader)
- throws IOException, ProcessorException {
- return false;
- }
-
- @Override
public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
long intervalEnd, BatchData data) throws ProcessorException {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
index 6858384..202f706 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
@@ -16,26 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
+import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
public class MeanAggrFunc extends AggregateFunction {
+
private double sum = 0.0;
private int cnt = 0;
+ private TSDataType seriesDataType;
- public MeanAggrFunc() {
+ public MeanAggrFunc(TSDataType seriesDataType) {
super(AggregationConstant.MEAN, TSDataType.DOUBLE);
+ this.seriesDataType = seriesDataType;
}
@Override
@@ -46,7 +51,7 @@ public class MeanAggrFunc extends AggregateFunction {
public BatchData getResult() {
if (cnt > 0) {
resultData.putTime(0);
- resultData.putDouble( sum / cnt);
+ resultData.putDouble(sum / cnt);
}
return resultData;
}
@@ -60,7 +65,6 @@ public class MeanAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader)
throws IOException, ProcessorException {
- TSDataType type = dataInThisPage.getDataType();
while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
Object sumVal = null;
if (dataInThisPage.currentTime() <
unsequenceReader.current().getTimestamp()) {
@@ -74,11 +78,11 @@ public class MeanAggrFunc extends AggregateFunction {
sumVal = unsequenceReader.current().getValue().getValue();
unsequenceReader.next();
}
- updateMean(type, sumVal);
+ updateMean(seriesDataType, sumVal);
}
while (dataInThisPage.hasNext()) {
- updateMean(type, dataInThisPage.currentValue());
+ updateMean(seriesDataType, dataInThisPage.currentValue());
dataInThisPage.next();
}
}
@@ -108,23 +112,24 @@ public class MeanAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
throws IOException, ProcessorException {
- TSDataType type = null;
- if (unsequenceReader.hasNext()) {
- type = unsequenceReader.current().getValue().getDataType();
- }
while (unsequenceReader.hasNext()) {
TimeValuePair pair = unsequenceReader.next();
- updateMean(type, pair.getValue().getValue());
+ updateMean(seriesDataType, pair.getValue().getValue());
}
}
@Override
- public boolean calcAggregationUsingTimestamps(EngineTimeGenerator
timeGenerator,
- EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp
unsequenceReader)
- throws IOException, ProcessorException {
- return false;
+ public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ for (long time : timestamps) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ if (value != null) {
+ updateMean(seriesDataType, value.getValue());
+ }
+ }
}
+
@Override
public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
long intervalEnd, BatchData data) throws ProcessorException {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
index 4dca740..ad87adb 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
@@ -20,12 +20,13 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
+import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -93,10 +94,19 @@ public class MinTimeAggrFunc extends AggregateFunction {
}
@Override
- public boolean calcAggregationUsingTimestamps(EngineTimeGenerator
timeGenerator,
- EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp
unsequenceReader)
- throws IOException, ProcessorException {
- return false;
+ public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ if (resultData.length() > 0) {
+ return;
+ }
+ for (long time : timestamps) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ if (value != null) {
+ resultData.putTime(0);
+ resultData.putLong(time);
+ return;
+ }
+ }
}
@Override
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
index b5a7073..82cd753 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
@@ -20,12 +20,13 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
+import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -109,11 +110,27 @@ public class MinValueAggrFunc extends AggregateFunction {
updateResult(minVal);
}
+ @Override
+ public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ Comparable<Object> minVal = null;
+ for (long time : timestamps) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ if(value == null){
+ continue;
+ }
+ if (minVal == null || minVal.compareTo(value.getValue()) > 0) {
+ minVal = (Comparable<Object>) value.getValue();
+ }
+ }
+ updateResult(minVal);
+ }
+
private void updateResult(Comparable<Object> minVal) {
if (resultData.length() == 0) {
if (minVal != null) {
resultData.putTime(0);
- resultData.putAnObject( minVal);
+ resultData.putAnObject(minVal);
}
} else {
if (minVal != null && minVal.compareTo(resultData.currentValue()) < 0) {
@@ -123,13 +140,6 @@ public class MinValueAggrFunc extends AggregateFunction {
}
@Override
- public boolean calcAggregationUsingTimestamps(EngineTimeGenerator
timeGenerator,
- EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp
unsequenceReader)
- throws IOException, ProcessorException {
- return false;
- }
-
- @Override
public void calcGroupByAggregation(long partitionStart, long partitionEnd,
long intervalStart,
long intervalEnd, BatchData data) throws ProcessorException {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
index cf030c4..77a347c 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
@@ -20,13 +20,14 @@
package org.apache.iotdb.db.query.aggregation.impl;
import java.io.IOException;
+import java.util.List;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.AggregationConstant;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -34,9 +35,11 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
public class SumAggrFunc extends AggregateFunction {
private double sum = 0.0;
+ private TSDataType seriesDataType;
- public SumAggrFunc() {
+ public SumAggrFunc(TSDataType seriesDataType) {
super(AggregationConstant.SUM, TSDataType.DOUBLE);
+ this.seriesDataType = seriesDataType;
}
@Override
@@ -46,7 +49,7 @@ public class SumAggrFunc extends AggregateFunction {
@Override
public BatchData getResult() {
- resultData.putDouble( sum);
+ resultData.putDouble(sum);
resultData.putTime(0);
return resultData;
}
@@ -59,7 +62,6 @@ public class SumAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromPageData(BatchData dataInThisPage,
IPointReader unsequenceReader)
throws IOException, ProcessorException {
- TSDataType type = dataInThisPage.getDataType();
while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
Object sumVal = null;
if (dataInThisPage.currentTime() <
unsequenceReader.current().getTimestamp()) {
@@ -73,11 +75,11 @@ public class SumAggrFunc extends AggregateFunction {
sumVal = unsequenceReader.current().getValue().getValue();
unsequenceReader.next();
}
- updateSum(type, sumVal);
+ updateSum(seriesDataType, sumVal);
}
while (dataInThisPage.hasNext()) {
- updateSum(type, dataInThisPage.currentValue());
+ updateSum(seriesDataType, dataInThisPage.currentValue());
dataInThisPage.next();
}
}
@@ -106,21 +108,21 @@ public class SumAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
throws IOException, ProcessorException {
- TSDataType type = null;
- if (unsequenceReader.hasNext()) {
- type = unsequenceReader.current().getValue().getDataType();
- }
while (unsequenceReader.hasNext()) {
TimeValuePair pair = unsequenceReader.next();
- updateSum(type, pair.getValue().getValue());
+ updateSum(seriesDataType, pair.getValue().getValue());
}
}
@Override
- public boolean calcAggregationUsingTimestamps(EngineTimeGenerator
timeGenerator,
- EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp
unsequenceReader)
- throws IOException, ProcessorException {
- return false;
+ public void calcAggregationUsingTimestamps(List<Long> timestamps,
+ EngineReaderByTimeStamp dataReader) throws IOException,
ProcessorException {
+ for (long time : timestamps) {
+ TsPrimitiveType value = dataReader.getValueInTimestamp(time);
+ if (value != null) {
+ updateSum(seriesDataType, value.getValue());
+ }
+ }
}
@Override
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index d43c958..bf07fbd 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.executor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -36,9 +37,13 @@ import
org.apache.iotdb.db.query.control.QueryDataSourceManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
import org.apache.iotdb.db.query.dataset.AggregateDataSet;
import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.AllDataReader;
import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -46,6 +51,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
public class AggregateEngineExecutor {
@@ -55,6 +61,11 @@ public class AggregateEngineExecutor {
private IExpression expression;
/**
+ * aggregation batch calculation size.
+ **/
+ private int aggregateFetchSize = 10 *
IoTDBDescriptor.getInstance().getConfig().fetchSize;
+
+ /**
* constructor.
*/
public AggregateEngineExecutor(long jobId, List<Path> selectedSeries,
List<String> aggres,
@@ -67,12 +78,8 @@ public class AggregateEngineExecutor {
/**
* execute aggregate function with only time filter or no filter.
+ *
* @param context query context
- * @return
- * @throws FileNodeManagerException
- * @throws IOException
- * @throws PathErrorException
- * @throws ProcessorException
*/
public AggregateDataSet executeWithOutTimeGenerator(QueryContext context)
throws FileNodeManagerException, IOException, PathErrorException,
ProcessorException {
@@ -219,8 +226,96 @@ public class AggregateEngineExecutor {
}
- public AggregateDataSet executeWithTimeGenerator(QueryContext context) {
- return null;
+ /**
+ * execute aggregate function with value filter.
+ * @param context query context.
+ */
+ public AggregateDataSet executeWithTimeGenerator(QueryContext context)
+ throws FileNodeManagerException, PathErrorException, IOException,
ProcessorException {
+ QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId,
selectedSeries);
+ QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId,
expression);
+
+ EngineTimeGenerator timestampGenerator = new EngineTimeGenerator(jobId,
expression, context);
+ List<EngineReaderByTimeStamp> readersOfSelectedSeries =
getReadersOfSelectedPaths(
+ selectedSeries, context);
+
+ List<AggregateFunction> aggregateFunctions = new ArrayList<>();
+ for (int i = 0; i < selectedSeries.size(); i++) {
+ TSDataType type =
MManager.getInstance().getSeriesType(selectedSeries.get(i).getFullPath());
+ AggregateFunction function =
AggreFuncFactory.getAggrFuncByName(aggres.get(i), type);
+ function.init();
+ aggregateFunctions.add(function);
+ }
+ List<BatchData> batchDatas =
aggregateWithTimeGenerator(aggregateFunctions, timestampGenerator,
+ readersOfSelectedSeries);
+ return constructDataSet(batchDatas);
+ }
+
+ private List<BatchData> aggregateWithTimeGenerator(List<AggregateFunction>
aggregateFunctions,
+ EngineTimeGenerator timestampGenerator,
+ List<EngineReaderByTimeStamp> readersOfSelectedSeries)
+ throws IOException, ProcessorException {
+
+ while (timestampGenerator.hasNext()) {
+
+ //generate timestamps for aggregate
+ List<Long> timestamps = new ArrayList<>(aggregateFetchSize);
+ for (int cnt = 0; cnt < aggregateFetchSize; cnt++) {
+ if (!timestampGenerator.hasNext()) {
+ break;
+ }
+ timestamps.add(timestampGenerator.next());
+ }
+
+ //cal part of aggregate result
+ for (int i = 0; i < readersOfSelectedSeries.size(); i++) {
+ aggregateFunctions.get(i)
+ .calcAggregationUsingTimestamps(timestamps,
readersOfSelectedSeries.get(i));
+ }
+
+ }
+
+ List<BatchData> batchDataList = new ArrayList<>();
+ for (AggregateFunction function : aggregateFunctions) {
+ batchDataList.add(function.getResult());
+ }
+ return batchDataList;
+ }
+
+ private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path>
paths,
+ QueryContext context)
+ throws IOException, FileNodeManagerException {
+
+ List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
+
+ for (Path path : paths) {
+
+ QueryDataSource queryDataSource =
QueryDataSourceManager.getQueryDataSource(jobId, path,
+ context);
+
+ PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new
PriorityMergeReaderByTimestamp();
+
+ // reader for sequence data
+ SequenceDataReader tsFilesReader = new
SequenceDataReader(queryDataSource.getSeqDataSource(),
+ null, context);
+
+ // reader for unSequence data
+ PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
+
+ if (tsFilesReader == null || !tsFilesReader.hasNext()) {
+ mergeReaderByTimestamp
+ .addReaderWithPriority(unSeqMergeReader,
PriorityMergeReader.HIGH_PRIORITY);
+ } else {
+ mergeReaderByTimestamp
+ .addReaderWithPriority(new AllDataReader(tsFilesReader,
unSeqMergeReader),
+ PriorityMergeReader.HIGH_PRIORITY);
+ }
+
+ readersOfSelectedSeries.add(mergeReaderByTimestamp);
+ }
+
+ return readersOfSelectedSeries;
}
private AggregateDataSet constructDataSet(List<BatchData> batchDataList)
throws IOException {
diff --git a/iotdb/src/test/java/PerformanceTest.java
b/iotdb/src/test/java/PerformanceTest.java
deleted file mode 100644
index 8bcb228..0000000
--- a/iotdb/src/test/java/PerformanceTest.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.query.executor.EngineQueryRouter;
-import org.apache.iotdb.jdbc.Config;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
-import org.apache.iotdb.tsfile.read.expression.QueryExpression;
-import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
-import org.apache.iotdb.tsfile.read.filter.TimeFilter;
-import org.apache.iotdb.tsfile.read.filter.ValueFilter;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-
-/**
- * Delete this class when submitting pr.
- */
-public class PerformanceTest {
-
- // private static int deviceStart = 5, deviceEnd = 9;
- private static int deviceStart = 9, deviceEnd = 9;
- private static int sensorStart = 8, sensorEnd = 8;
- private static String insertTemplate = "INSERT INTO
root.perform.group_0.d_%s(timestamp,s_%s"
- + ") VALUES(%d,%d)";
-
- public static void main(String[] args) throws IOException,
FileNodeManagerException, SQLException {
- try {
- Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- //insert();
-
- //singleWithoutFilterTest();
-
- // queryMultiSeriesWithoutFilterTest();
-
- queryMultiSeriesWithFilterTest();
- }
-
- private static void singleWithoutFilterTest() throws IOException,
FileNodeManagerException {
-
- List<Path> selectedPathList = new ArrayList<>();
- selectedPathList.add(getPath(1, 1));
-
- QueryExpression queryExpression = QueryExpression.create(selectedPathList,
null);
-
- EngineQueryRouter queryRouter = new EngineQueryRouter();
-
- long startTime = System.currentTimeMillis();
-
- QueryDataSet queryDataSet = queryRouter.query(queryExpression);
-
- int count = 0;
- while (queryDataSet.hasNext()) {
- RowRecord rowRecord = queryDataSet.next();
- count++;
- // output(count, rowRecord, true);
- }
-
- long endTime = System.currentTimeMillis();
- System.out
- .println(String.format("Time consume : %s, count number : %s", endTime
- startTime, count));
-
- }
-
- public static void queryMultiSeriesWithoutFilterTest()
- throws IOException, FileNodeManagerException {
-
- List<Path> selectedPathList = new ArrayList<>();
- for (int i = deviceStart; i <= deviceEnd; i++) {
- for (int j = sensorStart; j <= sensorEnd; j++) {
- selectedPathList.add(getPath(i, j));
- }
- }
-
- QueryExpression queryExpression = QueryExpression.create(selectedPathList,
null);
-
- EngineQueryRouter queryRouter = new EngineQueryRouter();
-
- long startTime = System.currentTimeMillis();
-
- QueryDataSet queryDataSet = queryRouter.query(queryExpression);
-
- int count = 0;
- while (queryDataSet.hasNext()) {
- RowRecord rowRecord = queryDataSet.next();
- count++;
- }
-
- long endTime = System.currentTimeMillis();
- System.out
- .println(String.format("Time consume : %s, count number : %s", endTime
- startTime, count));
-
- }
-
- public static void queryMultiSeriesWithFilterTest() throws IOException,
FileNodeManagerException {
-
- List<Path> selectedPathList = new ArrayList<>();
- for (int i = deviceStart; i <= deviceEnd; i++) {
- for (int j = sensorStart; j <= sensorEnd; j++) {
- selectedPathList.add(getPath(i, j));
- }
- }
-
- Filter valueFilter = ValueFilter.gtEq(34300);
- Filter timeFilter = FilterFactory
- .and(TimeFilter.gtEq(50000L), TimeFilter.ltEq(100000L));
- Filter filter = FilterFactory.and(timeFilter, valueFilter);
-
- IExpression expression = new SingleSeriesExpression(getPath(9, 9), filter);
- EngineQueryRouter queryRouter = new EngineQueryRouter();
-
- QueryExpression queryExpression = QueryExpression.create(selectedPathList,
expression);
- long startTime = System.currentTimeMillis();
-
- QueryDataSet queryDataSet = queryRouter.query(queryExpression);
-
- int count = 0;
- while (queryDataSet.hasNext()) {
- RowRecord rowRecord = queryDataSet.next();
- count++;
- // if (count % 10000 == 0)
- // System.out.println(rowRecord);
- }
-
- long endTime = System.currentTimeMillis();
- System.out
- .println(String.format("Time consume : %s ms, count number : %s",
endTime - startTime, count));
-
- }
-
- public static void output(int cnt, RowRecord rowRecord, boolean flag) {
- if (!flag) {
- return;
- }
-
- if (cnt % 10000 == 0) {
- System.out.println(cnt + " : " + rowRecord);
- }
-
- if (cnt > 97600) {
- System.out.println("----" + cnt + " : " + rowRecord);
- }
- }
-
- public static Path getPath(int d, int s) {
- return new Path(String.format("root.perform.group_0.d_%s.s_%s", d, s));
- }
-
- private static void insert() throws SQLException {
- int d_start = 0, d_end = 10;
- int s_start = 0, s_end = 10;
- int num = 100000;
- prepareSeries(d_start,d_end,s_start,s_end);
- prepareData(d_start,d_end,s_start,s_end, num);
- }
-
- private static void prepareData(int d_start, int d_end, int s_start, int
s_end, int num) throws SQLException {
- System.out.println("prepareData start!");
- Connection connection = null;
- try {
- connection = DriverManager
- .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
- "root");
- Statement statement = connection.createStatement();
- // prepare BufferWrite file
- for (int i = 20000; i <= 30000; i++) {
- insertInTimestamp(statement, d_start,d_end,s_start,s_end, i);
- }
- statement.execute("merge");
- System.out.println("prepareData 20000-30000 end ");
-
- // prepare Unseq-File
- for (int i = 1; i <= 10000; i++) {
- insertInTimestamp(statement, d_start,d_end,s_start,s_end, i);
- }
- statement.execute("merge");
- System.out.println("prepareData 1-10000 end ");
-
- // prepare muilty BufferWrite file
- for (int i = 40000; i <= 65000; i++) {
- insertInTimestamp(statement, d_start,d_end,s_start,s_end, i);
- }
- statement.execute("merge");
- System.out.println("prepareData 40000-65000 end ");
-
- // prepare muilty BufferWrite file
- for (int i = 80000; i <= 87000; i++) {
- insertInTimestamp(statement, d_start,d_end,s_start,s_end, i);
- }
- statement.execute("merge");
- System.out.println("prepareData 80000-87000 end ");
-
- // prepare BufferWrite cache
- for (int i = 90001; i <= 100000; i++) {
- insertInTimestamp(statement, d_start,d_end,s_start,s_end, i);
- }
- System.out.println("prepareData 90001-100000 end ");
-
- // prepare Overflow cache
- for (int i = 10001; i <= 20000; i++) {
- insertInTimestamp(statement, d_start,d_end,s_start,s_end, i);
- }
- System.out.println("prepareData 10001-20000 end ");
- statement.close();
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- private static void insertInTimestamp(Statement statement,int d_start, int
d_end, int s_start, int s_end, int time)
- throws SQLException {
- for(int m = d_start; m <= d_end; m++ ){
- for(int n = s_start; n<= s_end; n++){
- statement.execute(String.format(insertTemplate, m, n, time, time));
- }
- }
- }
-
- private static void prepareSeries(int d_start, int d_end, int s_start, int
s_end) throws SQLException {
- System.out.println("prepareSeries start!");
- Connection connection = null;
- try {
- connection = DriverManager
- .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
- "root");
- Statement statement = connection.createStatement();
- for(int i = d_start; i <= d_end; i++ ){
- statement.execute(createStorageGroupSql(i));
- }
- statement.close();
-
- statement = connection.createStatement();
- for(int i = d_start; i <= d_end; i++ ){
- for(int j = s_start; j<= s_end; j++){
- statement.execute(String.format("CREATE TIMESERIES
root.perform.group_0.d_%s.s_%s WITH DATATYPE=INT32, ENCODING=RLE", i, j));//
- }
- }
- statement.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- public static String createStorageGroupSql(int d){
- return String.format("SET STORAGE GROUP TO root.perform.group_0.d_%s", d);
- }
-
-}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataTestIT.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataTestIT.java
index 7cc23e1..528887f 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataTestIT.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationLargeDataTestIT.java
@@ -120,7 +120,7 @@ public class IoTDBAggregationLargeDataTestIT {
EnvironmentUtils.cleanEnv();
}
-
+ @Test
public void test() throws ClassNotFoundException, SQLException,
InterruptedException {
insertSQL();
@@ -152,7 +152,6 @@ public class IoTDBAggregationLargeDataTestIT {
"0,9,39,63.0,E,true"
};
- Class.forName(Config.JDBC_DRIVER_NAME);
Connection connection = null;
try {
connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
@@ -187,7 +186,6 @@ public class IoTDBAggregationLargeDataTestIT {
"0,9,39,63.0"
};
- Class.forName(Config.JDBC_DRIVER_NAME);
Connection connection = null;
try {
connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
@@ -217,10 +215,9 @@ public class IoTDBAggregationLargeDataTestIT {
private void sumAggreWithSingleFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,55260.0,156752.0,20262.21"
+ "0,55061.0,156752.0,20254"
};
- Class.forName(Config.JDBC_DRIVER_NAME);
Connection connection = null;
try {
connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
@@ -232,7 +229,7 @@ public class IoTDBAggregationLargeDataTestIT {
int cnt = 0;
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(sum(d0s0)) + ","
- + resultSet.getString(sum(d0s1)) + "," +
resultSet.getString(Constant.sum(d0s2));
+ + resultSet.getString(sum(d0s1)) + "," +
Math.round(resultSet.getDouble(Constant.sum(d0s2)));
Assert.assertEquals(ans, retArray[cnt]);
cnt++;
}
@@ -250,10 +247,9 @@ public class IoTDBAggregationLargeDataTestIT {
private void firstAggreWithSingleFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,90,1101,2.22,aaaaa,true"
+ "0,90,1101,2.22,ddddd,true"
};
- Class.forName(Config.JDBC_DRIVER_NAME);
Connection connection = null;
try {
connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
@@ -284,12 +280,11 @@ public class IoTDBAggregationLargeDataTestIT {
private void meanAggreWithSingleFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,75.28610354,211.827027,27.53017663"
+ "0,75,212,28"
};
- Class.forName(Config.JDBC_DRIVER_NAME);
Connection connection = null;
try {
- connection =
DriverManager.getConnection("jdbc:tsfile://127.0.0.1:6667/", "root", "root");
+ connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
Statement statement = connection.createStatement();
boolean hasResultSet = statement.execute("select
mean(s0),mean(s1),mean(s2) from root.vehicle.d0 where s1 >= 0");
//boolean hasResultSet = statement.execute("select count(s3) from
root.vehicle.d0 where s1 >= 0");
@@ -297,8 +292,8 @@ public class IoTDBAggregationLargeDataTestIT {
ResultSet resultSet = statement.getResultSet();
int cnt = 0;
while (resultSet.next()) {
- String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(mean(d0s0))
- + "," + resultSet.getString(mean(d0s1)) + "," +
resultSet.getString(mean(d0s2));
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
Math.round(resultSet.getDouble(mean(d0s0)))
+ + "," + Math.round(resultSet.getDouble(mean(d0s1)))+ "," +
Math.round(resultSet.getDouble(mean(d0s2)));
//System.out.println("!!!!!============ " + ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
@@ -317,10 +312,9 @@ public class IoTDBAggregationLargeDataTestIT {
private void countAggreWithSingleFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,734,740,736"
+ "0,733,740,734"
};
- Class.forName(Config.JDBC_DRIVER_NAME);
Connection connection = null;
try {
connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
@@ -351,7 +345,7 @@ public class IoTDBAggregationLargeDataTestIT {
private void minMaxTimeAggreWithSingleFilterTest() throws
ClassNotFoundException, SQLException {
String[] retArray = new String[]{
- "0,104,1,2,60,100"
+ "0,104,1,2,101,100"
};
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -442,7 +436,7 @@ public class IoTDBAggregationLargeDataTestIT {
private void maxValueAggreWithSingleFilterTest() throws
ClassNotFoundException, SQLException {
String[] retArray = new String[]{
- "0,199,40000,122.0,fffff,true"
+ "0,99,40000,122.0,fffff,true"
};
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -479,7 +473,7 @@ public class IoTDBAggregationLargeDataTestIT {
private void meanAggreWithMultiFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,55260.0,734,75.28610354,211.827027,27.53017663"
+ "0,55061.0,733,75,212,28"
};
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -494,8 +488,8 @@ public class IoTDBAggregationLargeDataTestIT {
int cnt = 0;
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(sum(d0s0)) + "," + resultSet.getString(count(d0s0))
- + "," + resultSet.getString(mean(d0s0))
- + "," + resultSet.getString(mean(d0s1)) + "," +
resultSet.getString(mean(d0s2));
+ + "," + Math.round(resultSet.getDouble(mean(d0s0)))
+ + "," + Math.round(resultSet.getDouble(mean(d0s1))) + "," +
Math.round(resultSet.getDouble(mean(d0s2)));
//System.out.println("!!!!!============ " + ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
@@ -514,7 +508,7 @@ public class IoTDBAggregationLargeDataTestIT {
private void sumAggreWithMultiFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,55260,156752,20262.21"
+ "0,55061.0,156752.0,20262"
};
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -529,7 +523,7 @@ public class IoTDBAggregationLargeDataTestIT {
int cnt = 0;
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(sum(d0s0))
- + "," + resultSet.getString(sum(d0s1)) + "," +
resultSet.getString(sum(d0s2));
+ + "," + resultSet.getString(sum(d0s1)) + "," +
Math.round(resultSet.getDouble(sum(d0s2)));
//String ans = resultSet.getString(sum(d0s3));
//System.out.println("!!!!!============ " + ans);
Assert.assertEquals(retArray[cnt], ans);
@@ -549,7 +543,7 @@ public class IoTDBAggregationLargeDataTestIT {
private void firstAggreWithMultiFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,90,1101,2.22,aaaaa,true"
+ "0,90,1101,2.22,ddddd,true"
};
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -585,7 +579,7 @@ public class IoTDBAggregationLargeDataTestIT {
private void countAggreWithMultiFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,734,740,736,485,1"
+ "0,733,740,736,482,1"
};
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -621,7 +615,7 @@ public class IoTDBAggregationLargeDataTestIT {
private void minTimeAggreWithMultiFilterTest() throws
ClassNotFoundException, SQLException {
String[] retArray = new String[]{
- "0,104,1,2,60,100"
+ "0,104,1,2,101,100"
};
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -726,7 +720,7 @@ public class IoTDBAggregationLargeDataTestIT {
private void maxValueAggreWithMultiFilterTest() throws
ClassNotFoundException, SQLException {
String[] retArray = new String[]{
- "0,199,40000,122.0,fffff,true"
+ "0,99,40000,122.0,fffff,true"
};
Class.forName(Config.JDBC_DRIVER_NAME);
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataTestIT.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataTestIT.java
index 912d91e..694e6b1 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataTestIT.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataTestIT.java
@@ -275,6 +275,7 @@ public class IoTDBAggregationSmallDataTestIT {
}
}
+ @Test
public void lastAggreWithSingleFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
"0,22222,55555"
@@ -307,6 +308,7 @@ public class IoTDBAggregationSmallDataTestIT {
}
}
+ @Test
public void firstAggreWithSingleFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
"0,99,180"
@@ -339,9 +341,10 @@ public class IoTDBAggregationSmallDataTestIT {
}
}
+ @Test
public void sumAggreWithSingleFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,99.0,379.0,28.88"
+ "0,22321.0,55934.0,1029"
};
Class.forName(Config.JDBC_DRIVER_NAME);
Connection connection = null;
@@ -355,7 +358,7 @@ public class IoTDBAggregationSmallDataTestIT {
int cnt = 0;
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(sum(d0s0))
- + "," + resultSet.getString(sum(d0s1)) + "," +
resultSet.getString(sum(d0s2));
+ + "," + resultSet.getString(sum(d0s1)) + "," +
Math.round(resultSet.getDouble(sum(d0s2)));
//System.out.println("!!!!!============ " + ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
@@ -372,9 +375,10 @@ public class IoTDBAggregationSmallDataTestIT {
}
}
+ @Test
public void meanAggreWithSingleFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,99.0,189.5,7.22"
+ "0,11160.5,18645,206"
};
Class.forName(Config.JDBC_DRIVER_NAME);
Connection connection = null;
@@ -388,7 +392,7 @@ public class IoTDBAggregationSmallDataTestIT {
int cnt = 0;
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString(mean(d0s0))
- + "," + resultSet.getString(mean(d0s1)) + "," +
resultSet.getString(mean(d0s2));
+ + "," + Math.round(resultSet.getDouble(mean(d0s1))) + "," +
Math.round(resultSet.getDouble(mean(d0s2)));
//System.out.println("!!!!!============ " + ans);
Assert.assertEquals(retArray[cnt], ans);
cnt++;
@@ -405,9 +409,10 @@ public class IoTDBAggregationSmallDataTestIT {
}
}
+ @Test
public void countAggreWithSingleFilterTest() throws ClassNotFoundException,
SQLException {
String[] retArray = new String[]{
- "0,1,2,4,1,0"
+ "0,2,3,5,1,0"
};
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -440,6 +445,7 @@ public class IoTDBAggregationSmallDataTestIT {
}
}
+ @Test
public void minTimeAggreWithSingleFilterTest() throws
ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,104,1,2,101,100"
@@ -476,6 +482,7 @@ public class IoTDBAggregationSmallDataTestIT {
}
}
+ @Test
public void maxTimeAggreWithSingleFilterTest() throws
ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,105,105,105,102,100"
@@ -512,6 +519,7 @@ public class IoTDBAggregationSmallDataTestIT {
}
}
+ @Test
public void minValueAggreWithSingleFilterTest() throws
ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,90,180,2.22,ddddd,true"
@@ -549,6 +557,7 @@ public class IoTDBAggregationSmallDataTestIT {
}
}
+ @Test
public void maxValueAggreWithSingleFilterTest() throws
ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,99,50000,11.11,fffff,true"
@@ -585,6 +594,7 @@ public class IoTDBAggregationSmallDataTestIT {
}
}
+ @Test
public void countAggreWithMultiMultiFilterTest() throws
ClassNotFoundException, SQLException {
String[] retArray = new String[]{
"0,2",
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
index 7a4697a..2d169e3 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
@@ -1,3 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iotdb.db.integration;
import static org.apache.iotdb.db.integration.Constant.count;