This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/aggregate by this push:
     new 43fe083  group by
     new 5b8b3b5  merge
43fe083 is described below

commit 43fe083648739c52c8aedd55d9e9eb35292a845b
Author: suyue <[email protected]>
AuthorDate: Thu Mar 21 13:51:22 2019 +0800

    group by
---
 .../db/engine/querycontext/ReadOnlyMemChunk.java   |   4 +-
 .../java/org/apache/iotdb/db/metadata/MTree.java   |   2 +-
 .../iotdb/db/qp/executor/OverflowQPExecutor.java   |  12 +-
 .../iotdb/db/qp/executor/QueryProcessExecutor.java |  75 ++---
 .../iotdb/db/qp/physical/sys/MetadataPlan.java     |   2 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   1 +
 .../db/query/aggregation/AggreResultData.java      | 192 +++++++++++
 .../db/query/aggregation/AggregateFunction.java    |  42 ++-
 .../db/query/aggregation/impl/CountAggrFunc.java   |  78 +++--
 .../db/query/aggregation/impl/FirstAggrFunc.java   |  78 +++--
 .../db/query/aggregation/impl/LastAggrFunc.java    |  43 ++-
 .../db/query/aggregation/impl/MaxTimeAggrFunc.java |  90 ++----
 .../query/aggregation/impl/MaxValueAggrFunc.java   |  80 +++--
 .../db/query/aggregation/impl/MeanAggrFunc.java    |  57 +++-
 .../db/query/aggregation/impl/MinTimeAggrFunc.java |  83 +++--
 .../query/aggregation/impl/MinValueAggrFunc.java   |  87 +++--
 .../db/query/aggregation/impl/SumAggrFunc.java     |   8 +-
 ...Reader.java => AggreResultDataPointReader.java} |  18 +-
 .../db/query/executor/AggregateEngineExecutor.java | 108 +++----
 .../executor/EngineExecutorWithTimeGenerator.java  |  44 +--
 .../iotdb/db/query/executor/EngineQueryRouter.java | 124 +++++++-
 .../iotdb/db/query/executor/GroupByEngine.java     | 172 ++++++++++
 .../executor/GroupByWithOnlyTimeFilterDataSet.java | 277 ++++++++++++++++
 .../executor/GroupByWithValueFilterDataSet.java    | 142 +++++++++
 .../db/query/factory/SeriesReaderFactory.java      |  44 +++
 .../apache/iotdb/db/utils/TimeValuePairUtils.java  |  27 ++
 .../org/apache/iotdb/db/utils/TsPrimitiveType.java |   2 +-
 .../iotdb/db/integration/IOTDBGroupByTestIT.java   | 351 +++++++++++++++++++++
 .../db/integration/IoTDBAggregationTestIT.java     |   2 +-
 .../apache/iotdb/db/qp/plan/PhysicalPlanTest.java  |   2 +-
 .../apache/iotdb/db/qp/utils/MemIntQpExecutor.java |   7 +-
 .../iotdb/db/query/executor/GroupByEngineTest.java | 124 ++++++++
 .../tsfile/read/query/dataset/QueryDataSet.java    |   7 +
 33 files changed, 1958 insertions(+), 427 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index d42eef7..2bc08cc 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -48,8 +48,8 @@ public class ReadOnlyMemChunk implements TimeValuePairSorter {
   /**
    * init by TSDataType and TimeValuePairSorter.
    */
-//  public ReadOnlyMemChunk(TSDataType dataType, TimeValuePairSorter 
memSeries) {
-//    this(dataType, memSeries, Collections.emptyMap());
+//  public ReadOnlyMemChunk(TSDataType resultDataType, TimeValuePairSorter 
memSeries) {
+//    this(resultDataType, memSeries, Collections.emptyMap());
 //  }
 
   /**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java 
b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 76c284e..1592636 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -869,7 +869,7 @@ public class MTree implements Serializable {
     if (node.isLeaf()) {
       if (nodes.length <= idx) {
         String nodePath = parent + node;
-        List<String> tsRow = new ArrayList<>(4);// get [name,storage 
group,dataType,encoding]
+        List<String> tsRow = new ArrayList<>(4);// get [name,storage 
group,resultDataType,encoding]
         tsRow.add(nodePath);
         MeasurementSchema measurementSchema = node.getSchema();
         tsRow.add(node.getDataFileName());
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 00ce7ae..6eb69bb 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -190,10 +190,10 @@ public class OverflowQPExecutor extends 
QueryProcessExecutor {
   // }
 
   @Override
-  public QueryDataSet groupBy(List<Pair<Path, String>> aggres, IExpression 
expression, long unit,
-      long origin,
-      List<Pair<Long, Long>> intervals, int fetchSize) throws 
ProcessorException {
-    throw new ProcessorException("not support");
+  public QueryDataSet groupBy(List<Path> paths, List<String> aggres, 
IExpression expression, long unit,
+      long origin, List<Pair<Long, Long>> intervals)
+      throws ProcessorException, FileNodeManagerException, 
QueryFilterOptimizationException, PathErrorException, IOException {
+    return new EngineQueryRouter().groupBy(paths, aggres, expression, unit, 
origin, intervals);
   }
 
   @Override
@@ -516,7 +516,7 @@ public class OverflowQPExecutor extends 
QueryProcessExecutor {
               if (!columnSchema.getType().equals(dataType)
                   || !columnSchema.getEncodingType().equals(encoding)) {
                 throw new ProcessorException(String.format(
-                    "The dataType or encoding of the last node %s is 
conflicting in the storage group %s",
+                    "The resultDataType or encoding of the last node %s is 
conflicting in the storage group %s",
                     lastNode, fileNodePath));
               }
               mManager.addPathToMTree(path.getFullPath(), dataType, encoding, 
compressor, props);
@@ -531,7 +531,7 @@ public class OverflowQPExecutor extends 
QueryProcessExecutor {
               if (isNewMeasurement) {
                 // add time series to schema
                 fileNodeManager.addTimeSeries(path, dataType, encoding, 
compressor, props);
-                //TODO fileNodeManager.addTimeSeries(path, dataType, encoding, 
compressor, encodingArgs);
+                //TODO fileNodeManager.addTimeSeries(path, resultDataType, 
encoding, compressor, encodingArgs);
               }
               // fileNodeManager.closeOneFileNode(namespacePath);
             } catch (FileNodeManagerException e) {
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 86ae7f1..7d08808 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -47,16 +48,24 @@ public abstract class QueryProcessExecutor {
   public QueryProcessExecutor() {
   }
 
-  public QueryDataSet processQuery(PhysicalPlan plan) throws IOException, 
FileNodeManagerException, PathErrorException, QueryFilterOptimizationException, 
ProcessorException {
+  public QueryDataSet processQuery(PhysicalPlan plan)
+      throws IOException, FileNodeManagerException, PathErrorException,
+      QueryFilterOptimizationException, ProcessorException {
     QueryPlan queryPlan = (QueryPlan) plan;
 
     QueryExpression queryExpression = 
QueryExpression.create().setSelectSeries(queryPlan.getPaths())
         .setExpression(queryPlan.getExpression());
-
-    if(plan instanceof AggregationPlan) {
-      return aggregate(plan.getPaths(), plan.getAggregations(), 
((AggregationPlan) plan).getExpression());
+    if (plan instanceof GroupByPlan) {
+      GroupByPlan groupByPlan = (GroupByPlan) plan;
+      return groupBy(groupByPlan.getPaths(), groupByPlan.getAggregations(),
+          groupByPlan.getExpression(), groupByPlan.getUnit(), 
groupByPlan.getOrigin(),
+          groupByPlan.getIntervals());
     }
 
+    if (plan instanceof AggregationPlan) {
+      return aggregate(plan.getPaths(), plan.getAggregations(),
+          ((AggregationPlan) plan).getExpression());
+    }
     return queryRouter.query(queryExpression);
   }
 
@@ -79,25 +88,22 @@ public abstract class QueryProcessExecutor {
     this.fetchSize.set(fetchSize);
   }
 
-  public abstract QueryDataSet aggregate(List<Path> paths, List<String> 
aggres, IExpression expression)
-          throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException, QueryFilterOptimizationException;
+  public abstract QueryDataSet aggregate(List<Path> paths, List<String> aggres,
+      IExpression expression) throws ProcessorException, IOException, 
PathErrorException,
+      FileNodeManagerException, QueryFilterOptimizationException;
 
-  public abstract QueryDataSet groupBy(List<Pair<Path, String>> aggres, 
IExpression expression,
-      long unit,
-      long origin, List<Pair<Long, Long>> intervals, int fetchSize)
-      throws ProcessorException, IOException, PathErrorException;
+  public abstract QueryDataSet groupBy(List<Path> paths, List<String> aggres,
+      IExpression expression, long unit, long origin, List<Pair<Long, Long>> 
intervals)
+      throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException,
+      QueryFilterOptimizationException;
 
   /**
    * executeWithGlobalTimeFilter update command and return whether the 
operator is successful.
    *
-   * @param path
-   *            : update series seriesPath
-   * @param startTime
-   *            start time in update command
-   * @param endTime
-   *            end time in update command
-   * @param value
-   *            - in type of string
+   * @param path : update series seriesPath
+   * @param startTime start time in update command
+   * @param endTime end time in update command
+   * @param value - in type of string
    * @return - whether the operator is successful.
    */
   public abstract boolean update(Path path, long startTime, long endTime, 
String value)
@@ -106,10 +112,8 @@ public abstract class QueryProcessExecutor {
   /**
    * executeWithGlobalTimeFilter delete command and return whether the 
operator is successful.
    *
-   * @param paths
-   *            : delete series paths
-   * @param deleteTime
-   *            end time in delete command
+   * @param paths : delete series paths
+   * @param deleteTime end time in delete command
    * @return - whether the operator is successful.
    */
   public boolean delete(List<Path> paths, long deleteTime) throws 
ProcessorException {
@@ -143,10 +147,8 @@ public abstract class QueryProcessExecutor {
   /**
    * executeWithGlobalTimeFilter delete command and return whether the 
operator is successful.
    *
-   * @param path
-   *            : delete series seriesPath
-   * @param deleteTime
-   *            end time in delete command
+   * @param path : delete series seriesPath
+   * @param deleteTime end time in delete command
    * @return - whether the operator is successful.
    */
   protected abstract boolean delete(Path path, long deleteTime) throws 
ProcessorException;
@@ -154,12 +156,9 @@ public abstract class QueryProcessExecutor {
   /**
    * insert a single value. Only used in test
    *
-   * @param path
-   *            seriesPath to be inserted
-   * @param insertTime
-   *            - it's time point but not a range
-   * @param value
-   *            value to be inserted
+   * @param path seriesPath to be inserted
+   * @param insertTime - it's time point but not a range
+   * @param value value to be inserted
    * @return - Operate Type.
    */
   public abstract int insert(Path path, long insertTime, String value) throws 
ProcessorException;
@@ -167,14 +166,10 @@ public abstract class QueryProcessExecutor {
   /**
    * executeWithGlobalTimeFilter insert command and return whether the 
operator is successful.
    *
-   * @param deviceId
-   *            deviceId to be inserted
-   * @param insertTime
-   *            - it's time point but not a range
-   * @param measurementList
-   *            measurements to be inserted
-   * @param insertValues
-   *            values to be inserted
+   * @param deviceId deviceId to be inserted
+   * @param insertTime - it's time point but not a range
+   * @param measurementList measurements to be inserted
+   * @param insertValues values to be inserted
    * @return - Operate Type.
    */
   public abstract int multiInsert(String deviceId, long insertTime, 
List<String> measurementList,
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
index df0c566..1841c36 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/physical/sys/MetadataPlan.java
@@ -113,7 +113,7 @@ public class MetadataPlan extends PhysicalPlan {
 
   @Override
   public String toString() {
-    String ret = String.format("seriesPath: %s\ndataType: %s\nencoding: 
%s\nnamespace type: %s\nargs: ", path, dataType, encoding, namespaceType);
+    String ret = String.format("seriesPath: %s\nresultDataType: %s\nencoding: 
%s\nnamespace type: %s\nargs: ", path, dataType, encoding, namespaceType);
     StringBuilder stringBuilder = new StringBuilder(ret.length()+50);
     stringBuilder.append(ret);
     for (Map.Entry prop : props.entrySet()) {
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 2caac94..290d777 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -242,6 +242,7 @@ public class PhysicalGenerator {
       ((GroupByPlan) queryPlan).setUnit(queryOperator.getUnit());
       ((GroupByPlan) queryPlan).setOrigin(queryOperator.getOrigin());
       ((GroupByPlan) queryPlan).setIntervals(queryOperator.getIntervals());
+      ((GroupByPlan) 
queryPlan).setAggregations(queryOperator.getSelectOperator().getAggregations());
     } else if (queryOperator.isFill()) {
       queryPlan = new FillQueryPlan();
       FilterOperator timeFilter = queryOperator.getFilterOperator();
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
new file mode 100644
index 0000000..0efae01
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreResultData.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class AggreResultData {
+
+  private long timestamp;
+  private TSDataType dataType;
+
+  private boolean booleanRet;
+  private int intRet;
+  private long longRet;
+  private float floatRet;
+  private double doubleRet;
+  private Binary binaryRet;
+
+  private boolean isSetValue;
+  private boolean isSetTime;
+
+  public AggreResultData(TSDataType dataType) {
+    this.dataType = dataType;
+    this.isSetTime = false;
+    this.isSetValue = false;
+  }
+
+  public void reSet() {
+    isSetValue = false;
+    isSetTime = false;
+  }
+
+  public void putTimeAndValue(long timestamp, Object v) {
+    setTimestamp(timestamp);
+    setAnObject((Comparable<?>) v);
+  }
+
+  public Object getValue() {
+    switch (dataType) {
+      case BOOLEAN:
+        return booleanRet;
+      case DOUBLE:
+        return doubleRet;
+      case TEXT:
+        return binaryRet;
+      case FLOAT:
+        return floatRet;
+      case INT32:
+        return intRet;
+      case INT64:
+        return longRet;
+      default:
+        throw new UnSupportedDataTypeException(String.valueOf(dataType));
+    }
+  }
+
+  /**
+   * set an object.
+   *
+   * @param v object value
+   */
+  public void setAnObject(Comparable<?> v) {
+    isSetValue = true;
+    switch (dataType) {
+      case BOOLEAN:
+        booleanRet = (Boolean) v;
+        break;
+      case DOUBLE:
+        doubleRet = (Double) v;
+        break;
+      case TEXT:
+        binaryRet = (Binary) v;
+        break;
+      case FLOAT:
+        floatRet = (Float) v;
+        break;
+      case INT32:
+        intRet = (Integer) v;
+        break;
+      case INT64:
+        longRet = (Long) v;
+        break;
+      default:
+        throw new UnSupportedDataTypeException(String.valueOf(dataType));
+    }
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    isSetTime = true;
+    this.timestamp = timestamp;
+  }
+
+  public TSDataType getDataType() {
+    return dataType;
+  }
+
+  public boolean isBooleanRet() {
+    return booleanRet;
+  }
+
+  public void setBooleanRet(boolean booleanRet) {
+    this.isSetValue = true;
+    this.booleanRet = booleanRet;
+  }
+
+  public int getIntRet() {
+    return intRet;
+  }
+
+  public void setIntRet(int intRet) {
+    this.isSetValue = true;
+    this.intRet = intRet;
+  }
+
+  public long getLongRet() {
+    return longRet;
+  }
+
+  public void setLongRet(long longRet) {
+    this.isSetValue = true;
+    this.longRet = longRet;
+  }
+
+  public float getFloatRet() {
+    return floatRet;
+  }
+
+  public void setFloatRet(float floatRet) {
+    this.isSetValue = true;
+    this.floatRet = floatRet;
+  }
+
+  public double getDoubleRet() {
+    return doubleRet;
+  }
+
+  public void setDoubleRet(double doubleRet) {
+    this.isSetValue = true;
+    this.doubleRet = doubleRet;
+  }
+
+  public Binary getBinaryRet() {
+    return binaryRet;
+  }
+
+  public void setBinaryRet(Binary binaryRet) {
+    this.isSetValue = true;
+    this.binaryRet = binaryRet;
+  }
+
+  public boolean isSetValue() {
+    return isSetValue;
+  }
+
+  public boolean isSetTime() {
+    return isSetTime;
+  }
+
+  public AggreResultData deepCopy() {
+    AggreResultData aggreResultData = new AggreResultData(this.dataType);
+    if (isSetValue) {
+      aggreResultData.setAnObject((Comparable<?>) this.getValue());
+    }
+    if (isSetTime) {
+      aggreResultData.setTimestamp(this.getTimestamp());
+    }
+    return aggreResultData;
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
index 5981b58..bb225e5 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
@@ -31,24 +31,24 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 public abstract class AggregateFunction {
 
   protected String name;
-  protected BatchData resultData;
-  protected TSDataType dataType;
+  protected AggreResultData resultData;
+  protected TSDataType resultDataType;
 
   /**
    * construct.
    *
    * @param name aggregate function name.
-   * @param dataType series data type.
+   * @param dataType result data type.
    */
   public AggregateFunction(String name, TSDataType dataType) {
     this.name = name;
-    this.dataType = dataType;
-    resultData = new BatchData(dataType, true, true);
+    this.resultDataType = dataType;
+    resultData = new AggreResultData(dataType);
   }
 
   public abstract void init();
 
-  public abstract BatchData getResult();
+  public abstract AggreResultData getResult();
 
   /**
    * <p>
@@ -74,6 +74,20 @@ public abstract class AggregateFunction {
   public abstract void calculateValueFromPageData(BatchData dataInThisPage,
       IPointReader unsequenceReader) throws IOException, ProcessorException;
 
+  /**
+   * <p>
+   * Could not calculate using <method>calculateValueFromPageHeader</method> 
directly. Calculate the
+   * aggregation according to all decompressed data in this page.
+   * </p>
+   *
+   * @param dataInThisPage the data in the DataPage
+   * @param unsequenceReader unsequence data reader
+   * @param bound the time upper bounder of data in unsequence data reader
+   * @throws IOException TsFile data read exception
+   * @throws ProcessorException wrong aggregation method parameter
+   */
+  public abstract void calculateValueFromPageData(BatchData dataInThisPage,
+      IPointReader unsequenceReader, long bound) throws IOException, 
ProcessorException;
 
   /**
    * <p>
@@ -106,21 +120,21 @@ public abstract class AggregateFunction {
    * @throws ProcessorException wrong aggregation method parameter
    */
   public abstract void calcAggregationUsingTimestamps(List<Long> timestamps,
-      EngineReaderByTimeStamp dataReader) throws IOException, 
ProcessorException;
+      EngineReaderByTimeStamp dataReader) throws IOException;
 
   /**
-   * Judge if aggregation results have been calculated.
+   * Judge if aggregation results have been calculated. In other words, if the 
aggregated result
+   * does not need to compute the remaining data, it returns true.
    * @return If the aggregation result has been calculated return true, else 
return false.
    */
   public abstract boolean isCalculatedAggregationResult();
 
   /**
-   * <p>
-   * This method is calculate the group by function.
-   * </p>
+   * Return data type of aggregation function result data.
+   * @return
    */
-  public abstract void calcGroupByAggregation(long partitionStart, long 
partitionEnd,
-      long intervalStart, long intervalEnd,
-      BatchData data) throws ProcessorException;
+  public TSDataType getResultDataType() {
+    return resultDataType;
+  }
 
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index 2967495..1964875 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.AggregationConstant;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -43,14 +44,13 @@ public class CountAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-    if (resultData.length() == 0) {
-      resultData.putTime(0);
-      resultData.putLong(0);
-    }
+    resultData.reSet();
+    resultData.setTimestamp(0);
+    resultData.setLongRet(0);
   }
 
   @Override
-  public BatchData getResult() {
+  public AggreResultData getResult() {
     return resultData;
   }
 
@@ -58,9 +58,9 @@ public class CountAggrFunc extends AggregateFunction {
   public void calculateValueFromPageHeader(PageHeader pageHeader) {
     LOGGER.debug("PageHeader>>>>>>>>>>>>num of rows:{}, minTimeStamp:{}, 
maxTimeStamp{}",
         pageHeader.getNumOfValues(), pageHeader.getMinTimestamp(), 
pageHeader.getMaxTimestamp());
-    long preValue = resultData.getLong();
+    long preValue = resultData.getLongRet();
     preValue += pageHeader.getNumOfValues();
-    resultData.setLong(0, preValue);
+    resultData.setLongRet(preValue);
 
   }
 
@@ -76,29 +76,63 @@ public class CountAggrFunc extends AggregateFunction {
       } else {
         unsequenceReader.next();
       }
-      long preValue = resultData.getLong();
+      long preValue = resultData.getLongRet();
       preValue += 1;
-      resultData.setLong(0, preValue);
+      resultData.setLongRet(preValue);
     }
 
     if (dataInThisPage.hasNext()) {
-      long preValue = resultData.getLong();
+      long preValue = resultData.getLongRet();
       preValue += (dataInThisPage.length() - dataInThisPage.getCurIdx());
-      resultData.setLong(0, preValue);
+      resultData.setLongRet(preValue);
+    }
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader,
+      long bound) throws IOException {
+    int cnt = 0;
+    while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      if (dataInThisPage.currentTime() == 
unsequenceReader.current().getTimestamp()) {
+        if (dataInThisPage.currentTime() >= bound) {
+          break;
+        }
+        dataInThisPage.next();
+        unsequenceReader.next();
+      } else if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        if (dataInThisPage.currentTime() >= bound) {
+          break;
+        }
+        dataInThisPage.next();
+      } else {
+        if (unsequenceReader.current().getTimestamp() >= bound) {
+          break;
+        }
+        unsequenceReader.next();
+      }
+      cnt++;
     }
+
+    while (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+      dataInThisPage.next();
+      cnt++;
+    }
+    long preValue = resultData.getLongRet();
+    preValue += cnt;
+    resultData.setLongRet(preValue);
   }
 
   @Override
   public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
-      throws IOException, ProcessorException {
+      throws IOException {
     int cnt = 0;
     while (unsequenceReader.hasNext()) {
       unsequenceReader.next();
       cnt++;
     }
-    long preValue = resultData.getLong();
+    long preValue = resultData.getLongRet();
     preValue += cnt;
-    resultData.setLong(0, preValue);
+    resultData.setLongRet(preValue);
   }
 
   @Override
@@ -109,14 +143,14 @@ public class CountAggrFunc extends AggregateFunction {
       unsequenceReader.next();
       cnt++;
     }
-    long preValue = resultData.getLong();
+    long preValue = resultData.getLongRet();
     preValue += cnt;
-    resultData.setLong(0, preValue);
+    resultData.setLongRet(preValue);
   }
 
   @Override
   public void calcAggregationUsingTimestamps(List<Long> timestamps,
-      EngineReaderByTimeStamp dataReader) throws IOException, 
ProcessorException {
+      EngineReaderByTimeStamp dataReader) throws IOException {
     int cnt = 0;
     for (long time : timestamps) {
       TsPrimitiveType value = dataReader.getValueInTimestamp(time);
@@ -125,19 +159,13 @@ public class CountAggrFunc extends AggregateFunction {
       }
     }
 
-    long preValue = resultData.getLong();
+    long preValue = resultData.getLongRet();
     preValue += cnt;
-    resultData.setLong(0, preValue);
+    resultData.setLongRet(preValue);
   }
 
   @Override
   public boolean isCalculatedAggregationResult() {
     return false;
   }
-
-  @Override
-  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
-      long intervalEnd, BatchData data) throws ProcessorException {
-
-  }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
index a2e4297..99d7a6a 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.AggregationConstant;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -39,17 +40,17 @@ public class FirstAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-
+    resultData.reSet();
   }
 
   @Override
-  public BatchData getResult() {
+  public AggreResultData getResult() {
     return resultData;
   }
 
   @Override
   public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
-    if (resultData.length() != 0) {
+    if (resultData.isSetTime()) {
       return;
     }
 
@@ -57,45 +58,70 @@ public class FirstAggrFunc extends AggregateFunction {
     if (firstVal == null) {
       throw new ProcessorException("PageHeader contains no FIRST value");
     }
-    resultData.putTime(0);
-    resultData.putAnObject(firstVal);
+    resultData.putTimeAndValue(0, firstVal);
   }
 
   @Override
   public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
       throws IOException, ProcessorException {
-    if (resultData.length() != 0) {
+    if (resultData.isSetTime()) {
       return;
     }
     if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
       if (dataInThisPage.currentTime() >= 
unsequenceReader.current().getTimestamp()) {
-        resultData.putTime(0);
-        
resultData.putAnObject(unsequenceReader.current().getValue().getValue());
+        resultData.putTimeAndValue(0, 
unsequenceReader.current().getValue().getValue());
         unsequenceReader.next();
         return;
       } else {
-        resultData.putTime(0);
-        resultData.putAnObject(dataInThisPage.currentValue());
+        resultData.putTimeAndValue(0, dataInThisPage.currentValue());
+        dataInThisPage.next();
         return;
       }
     }
 
     if (dataInThisPage.hasNext()) {
-      resultData.putTime(0);
-      resultData.putAnObject(dataInThisPage.currentValue());
+      resultData.putTimeAndValue(0, dataInThisPage.currentValue());
+      return;
+    }
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader,
+      long bound) throws IOException, ProcessorException {
+    if (resultData.isSetTime()) {
+      return;
+    }
+    if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      if (dataInThisPage.currentTime() >= 
unsequenceReader.current().getTimestamp()) {
+        if (unsequenceReader.current().getTimestamp() < bound) {
+          resultData.putTimeAndValue(0, 
unsequenceReader.current().getValue().getValue());
+          unsequenceReader.next();
+          return;
+        }
+      } else {
+        if (dataInThisPage.currentTime() < bound) {
+          resultData.putTimeAndValue(0, dataInThisPage.currentValue());
+          dataInThisPage.next();
+          return;
+        }
+      }
+    }
+
+    if (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+      resultData.putTimeAndValue(0, dataInThisPage.currentValue());
+      dataInThisPage.next();
       return;
     }
   }
 
   @Override
   public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
-      throws IOException, ProcessorException {
-    if (resultData.length() != 0) {
+      throws IOException {
+    if (resultData.isSetTime()) {
       return;
     }
     if (unsequenceReader.hasNext()) {
-      resultData.putTime(0);
-      resultData.putAnObject(unsequenceReader.current().getValue().getValue());
+      resultData.putTimeAndValue(0, 
unsequenceReader.current().getValue().getValue());
       return;
     }
   }
@@ -103,28 +129,26 @@ public class FirstAggrFunc extends AggregateFunction {
   @Override
   public void calculateValueFromUnsequenceReader(IPointReader 
unsequenceReader, long bound)
       throws IOException {
-    if (resultData.length() != 0) {
+    if (resultData.isSetTime()) {
       return;
     }
     if (unsequenceReader.hasNext() && 
unsequenceReader.current().getTimestamp() < bound) {
-      resultData.putTime(0);
-      resultData.putAnObject(unsequenceReader.current().getValue().getValue());
+      resultData.putTimeAndValue(0, 
unsequenceReader.current().getValue().getValue());
       return;
     }
   }
 
   @Override
   public void calcAggregationUsingTimestamps(List<Long> timestamps,
-      EngineReaderByTimeStamp dataReader) throws IOException, 
ProcessorException {
-    if (resultData.length() != 0) {
+      EngineReaderByTimeStamp dataReader) throws IOException {
+    if (resultData.isSetTime()) {
       return;
     }
 
     for (long time : timestamps) {
       TsPrimitiveType value = dataReader.getValueInTimestamp(time);
       if (value != null) {
-        resultData.putTime(0);
-        resultData.putAnObject(value.getValue());
+        resultData.putTimeAndValue(0, value.getValue());
         break;
       }
     }
@@ -132,12 +156,6 @@ public class FirstAggrFunc extends AggregateFunction {
 
   @Override
   public boolean isCalculatedAggregationResult() {
-    return resultData.length() != 0;
-  }
-
-  @Override
-  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
-      long intervalEnd, BatchData data) throws ProcessorException {
-
+    return resultData.isSetTime();
   }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
index 4630a58..151dba5 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.AggregationConstant;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -40,13 +41,13 @@ public class LastAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-
+    resultData.reSet();
   }
 
   @Override
-  public BatchData getResult() {
-    if (resultData.length() != 0) {
-      resultData.setTime(0, 0);
+  public AggreResultData getResult() {
+    if (resultData.isSetTime()) {
+      resultData.setTimestamp(0);
     }
     return resultData;
   }
@@ -60,14 +61,20 @@ public class LastAggrFunc extends AggregateFunction {
   @Override
   public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
       throws IOException, ProcessorException {
+    calculateValueFromPageData(dataInThisPage, unsequenceReader, 
Long.MAX_VALUE);
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader,
+      long bound) throws IOException, ProcessorException {
     long time = -1;
     Object lastVal = null;
-    int maxIndex = dataInThisPage.length() - 1;
-    if (maxIndex < 0) {
-      return;
+    while (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+      time = dataInThisPage.currentTime();
+      lastVal = dataInThisPage.currentValue();
+      dataInThisPage.next();
     }
-    time = dataInThisPage.getTimeByIndex(maxIndex);
-    lastVal = dataInThisPage.getValueByIndex(maxIndex);
+
     while (unsequenceReader.hasNext()) {
       if (unsequenceReader.current().getTimestamp() < time) {
         unsequenceReader.next();
@@ -114,7 +121,7 @@ public class LastAggrFunc extends AggregateFunction {
 
   @Override
   public void calcAggregationUsingTimestamps(List<Long> timestamps,
-      EngineReaderByTimeStamp dataReader) throws IOException, 
ProcessorException {
+      EngineReaderByTimeStamp dataReader) throws IOException {
 
     long time = -1;
     Object lastVal = null;
@@ -135,20 +142,12 @@ public class LastAggrFunc extends AggregateFunction {
     return false;
   }
 
-  @Override
-  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
-      long intervalEnd, BatchData data) throws ProcessorException {
-
-  }
-
   private void updateLastResult(long time, Object value) {
-    if (resultData.length() == 0) {
-      resultData.putAnObject(value);
-      resultData.putTime(time);
+    if (!resultData.isSetTime()) {
+      resultData.putTimeAndValue(time, value);
     } else {
-      if (time >= resultData.currentTime()) {
-        resultData.setAnObject(0, (Comparable<?>) value);
-        resultData.setTime(0, time);
+      if (time >= resultData.getTimestamp()) {
+        resultData.putTimeAndValue(time, value);
       }
     }
   }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
index e47aaf7..f6c6fa9 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.AggregationConstant;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -40,50 +41,48 @@ public class MaxTimeAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-
+    resultData.reSet();
   }
 
   @Override
-  public BatchData getResult() {
+  public AggreResultData getResult() {
     return resultData;
   }
 
   @Override
   public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
     long maxTimestamp = pageHeader.getMaxTimestamp();
-
-    //has not set value
-    if (resultData.length() == 0) {
-      resultData.putTime(0);
-      resultData.putLong(maxTimestamp);
-      return;
-    }
-
-    if (resultData.getLong() < maxTimestamp) {
-      resultData.setLong(0, maxTimestamp);
-    }
+    updateMaxTimeResult(0, maxTimestamp);
   }
 
   @Override
   public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
       throws IOException, ProcessorException {
-    long time = -1;
+
     int maxIndex = dataInThisPage.length() - 1;
     if (maxIndex < 0) {
       return;
     }
-    time = dataInThisPage.getTimeByIndex(maxIndex);
-    if (resultData.length() == 0) {
-      if (time != -1) {
-        resultData.putTime(0);
-        resultData.putAnObject(time);
+    long time = dataInThisPage.getTimeByIndex(maxIndex);
+    updateMaxTimeResult(0, time);
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader,
+      long bound) throws IOException, ProcessorException {
+    long time = -1;
+    while (dataInThisPage.hasNext()) {
+      if(dataInThisPage.currentTime() < bound){
+        time = dataInThisPage.currentTime();
+        dataInThisPage.next();
       }
-    } else {
-      //has set value
-      if (time != -1 && time > resultData.getLong()) {
-        resultData.setAnObject(0, time);
+      else{
+        break;
       }
     }
+    if (time != -1) {
+      updateMaxTimeResult(0, time);
+    }
   }
 
   @Override
@@ -93,16 +92,8 @@ public class MaxTimeAggrFunc extends AggregateFunction {
     while (unsequenceReader.hasNext()) {
       pair = unsequenceReader.next();
     }
-    if (resultData.length() == 0) {
-      if (pair != null) {
-        resultData.putTime(0);
-        resultData.putAnObject(pair.getTimestamp());
-      }
-    } else {
-      //has set value
-      if (pair != null && pair.getTimestamp() > resultData.getLong()) {
-        resultData.setAnObject(0, pair.getTimestamp());
-      }
+    if (pair != null) {
+      updateMaxTimeResult(0, pair.getTimestamp());
     }
   }
 
@@ -113,23 +104,15 @@ public class MaxTimeAggrFunc extends AggregateFunction {
     while (unsequenceReader.hasNext() && 
unsequenceReader.current().getTimestamp() < bound) {
       pair = unsequenceReader.next();
     }
-    if (resultData.length() == 0) {
-      if (pair != null) {
-        resultData.putTime(0);
-        resultData.putAnObject(pair.getTimestamp());
-      }
-    } else {
-      //has set value
-      if (pair != null && pair.getTimestamp() > resultData.getLong()) {
-        resultData.setAnObject(0, pair.getTimestamp());
-      }
+    if (pair != null) {
+      updateMaxTimeResult(0, pair.getTimestamp());
     }
   }
 
   //TODO Consider how to reverse order in dataReader(EngineReaderByTimeStamp)
   @Override
   public void calcAggregationUsingTimestamps(List<Long> timestamps,
-      EngineReaderByTimeStamp dataReader) throws IOException, 
ProcessorException {
+      EngineReaderByTimeStamp dataReader) throws IOException {
     long time = -1;
     for (int i = 0; i < timestamps.size(); i++) {
       TsPrimitiveType value = 
dataReader.getValueInTimestamp(timestamps.get(i));
@@ -141,15 +124,7 @@ public class MaxTimeAggrFunc extends AggregateFunction {
     if (time == -1) {
       return;
     }
-
-    if (resultData.length() == 0) {
-      resultData.putTime(0);
-      resultData.putLong(time);
-    } else {
-      if (resultData.getLong() < time) {
-        resultData.setLong(0, time);
-      }
-    }
+    updateMaxTimeResult(0, time);
   }
 
   @Override
@@ -157,9 +132,10 @@ public class MaxTimeAggrFunc extends AggregateFunction {
     return false;
   }
 
-  @Override
-  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
-      long intervalEnd, BatchData data) throws ProcessorException {
-
+  private void updateMaxTimeResult(long time, long value) {
+    if (!resultData.isSetValue() || value >= resultData.getLongRet()) {
+      resultData.setTimestamp(time);
+      resultData.setLongRet(value);
+    }
   }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
index 98d8d83..ceee209 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.AggregationConstant;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -39,25 +40,18 @@ public class MaxValueAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-
+    resultData.reSet();
   }
 
   @Override
-  public BatchData getResult() {
+  public AggreResultData getResult() {
     return resultData;
   }
 
   @Override
   public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
     Comparable<Object> maxVal = (Comparable<Object>) 
pageHeader.getStatistics().getMax();
-    if (resultData.length() == 0) {
-      resultData.putTime(0);
-      resultData.putAnObject(maxVal);
-    } else {
-      if (maxVal.compareTo(resultData.currentValue()) > 0) {
-        resultData.setAnObject(0, maxVal);
-      }
-    }
+    updateResult(maxVal);
   }
 
   @Override
@@ -96,6 +90,50 @@ public class MaxValueAggrFunc extends AggregateFunction {
   }
 
   @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader,
+      long bound) throws IOException, ProcessorException {
+    Comparable<Object> maxVal = null;
+    while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        if (dataInThisPage.currentTime() >= bound) {
+          break;
+        }
+        if (maxVal == null || maxVal.compareTo(dataInThisPage.currentValue()) 
< 0) {
+          maxVal = (Comparable<Object>) dataInThisPage.currentValue();
+        }
+        dataInThisPage.next();
+      } else if (dataInThisPage.currentTime() == 
unsequenceReader.current().getTimestamp()) {
+        if (dataInThisPage.currentTime() >= bound) {
+          break;
+        }
+        if (maxVal == null
+            || 
maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) {
+          maxVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+        }
+        dataInThisPage.next();
+        unsequenceReader.next();
+      } else {
+        if (unsequenceReader.current().getTimestamp() >= bound) {
+          break;
+        }
+        if (maxVal == null
+            || 
maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) {
+          maxVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+        }
+        unsequenceReader.next();
+      }
+    }
+
+    while (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+      if (maxVal == null || maxVal.compareTo(dataInThisPage.currentValue()) < 
0) {
+        maxVal = (Comparable<Object>) dataInThisPage.currentValue();
+      }
+      dataInThisPage.next();
+    }
+    updateResult(maxVal);
+  }
+
+  @Override
   public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
       throws IOException, ProcessorException {
     Comparable<Object> maxVal = null;
@@ -125,7 +163,7 @@ public class MaxValueAggrFunc extends AggregateFunction {
 
   @Override
   public void calcAggregationUsingTimestamps(List<Long> timestamps,
-      EngineReaderByTimeStamp dataReader) throws IOException, 
ProcessorException {
+      EngineReaderByTimeStamp dataReader) throws IOException {
     Comparable<Object> maxVal = null;
     for (long time : timestamps) {
       TsPrimitiveType value = dataReader.getValueInTimestamp(time);
@@ -145,21 +183,11 @@ public class MaxValueAggrFunc extends AggregateFunction {
   }
 
   private void updateResult(Comparable<Object> maxVal) {
-    if (resultData.length() == 0) {
-      if (maxVal != null) {
-        resultData.putTime(0);
-        resultData.putAnObject(maxVal);
-      }
-    } else {
-      if (maxVal != null && maxVal.compareTo(resultData.currentValue()) > 0) {
-        resultData.setAnObject(0, maxVal);
-      }
+    if (maxVal == null) {
+      return;
+    }
+    if (!resultData.isSetValue() || maxVal.compareTo(resultData.getValue()) > 
0) {
+      resultData.putTimeAndValue(0, maxVal);
     }
-  }
-
-  @Override
-  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
-      long intervalEnd, BatchData data) throws ProcessorException {
-
   }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
index 6a17285..36f7337 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
@@ -44,13 +45,16 @@ public class MeanAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
+    resultData.reSet();
+    sum = 0.0;
+    cnt = 0;
   }
 
   @Override
-  public BatchData getResult() {
+  public AggreResultData getResult() {
     if (cnt > 0) {
-      resultData.putTime(0);
-      resultData.putDouble(sum / cnt);
+      resultData.setTimestamp(0);
+      resultData.setDoubleRet(sum / cnt);
     }
     return resultData;
   }
@@ -86,7 +90,41 @@ public class MeanAggrFunc extends AggregateFunction {
     }
   }
 
-  private void updateMean(TSDataType type, Object sumVal) throws 
ProcessorException {
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader,
+      long bound) throws IOException, ProcessorException {
+    while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      Object sumVal = null;
+      if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        if (dataInThisPage.currentTime() >= bound) {
+          break;
+        }
+        sumVal = dataInThisPage.currentValue();
+        dataInThisPage.next();
+      } else if (dataInThisPage.currentTime() == 
unsequenceReader.current().getTimestamp()) {
+        if (dataInThisPage.currentTime() >= bound) {
+          break;
+        }
+        sumVal = unsequenceReader.current().getValue().getValue();
+        dataInThisPage.next();
+        unsequenceReader.next();
+      } else {
+        if (unsequenceReader.current().getTimestamp() >= bound) {
+          break;
+        }
+        sumVal = unsequenceReader.current().getValue().getValue();
+        unsequenceReader.next();
+      }
+      updateMean(seriesDataType, sumVal);
+    }
+
+    while (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+      updateMean(seriesDataType, dataInThisPage.currentValue());
+      dataInThisPage.next();
+    }
+  }
+
+  private void updateMean(TSDataType type, Object sumVal) throws IOException {
     switch (type) {
       case INT32:
         sum += (int) sumVal;
@@ -103,7 +141,7 @@ public class MeanAggrFunc extends AggregateFunction {
       case TEXT:
       case BOOLEAN:
       default:
-        throw new ProcessorException("Unsupported data type in aggregation 
MEAN : " + type);
+        throw new IOException("Unsupported data type in aggregation MEAN : " + 
type);
     }
     cnt++;
   }
@@ -128,7 +166,7 @@ public class MeanAggrFunc extends AggregateFunction {
 
   @Override
   public void calcAggregationUsingTimestamps(List<Long> timestamps,
-      EngineReaderByTimeStamp dataReader) throws IOException, 
ProcessorException {
+      EngineReaderByTimeStamp dataReader) throws IOException {
     for (long time : timestamps) {
       TsPrimitiveType value = dataReader.getValueInTimestamp(time);
       if (value != null) {
@@ -141,11 +179,4 @@ public class MeanAggrFunc extends AggregateFunction {
   public boolean isCalculatedAggregationResult() {
     return false;
   }
-
-
-  @Override
-  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
-      long intervalEnd, BatchData data) throws ProcessorException {
-
-  }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
index 7d8afd7..f6bc642 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.AggregationConstant;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -39,83 +40,113 @@ public class MinTimeAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-
+    resultData.reSet();
   }
 
   @Override
-  public BatchData getResult() {
+  public AggreResultData getResult() {
     return resultData;
   }
 
   @Override
   public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
-    if (resultData.length() > 0) {
+    if (resultData.isSetValue()) {
       return;
     }
     long time = pageHeader.getMinTimestamp();
-    resultData.putTime(0);
-    resultData.putLong(time);
+    resultData.putTimeAndValue(0, time);
   }
 
   @Override
   public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
       throws IOException, ProcessorException {
-    if (resultData.length() > 0) {
+    if (resultData.isSetValue()) {
       return;
     }
 
     if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
       if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
-        resultData.putTime(0);
-        resultData.putLong(dataInThisPage.currentTime());
+        resultData.setTimestamp(0);
+        resultData.setLongRet(dataInThisPage.currentTime());
       } else {
-        resultData.putTime(0);
-        resultData.putLong(unsequenceReader.current().getTimestamp());
+        resultData.setTimestamp(0);
+        resultData.setLongRet(unsequenceReader.current().getTimestamp());
       }
       return;
     }
 
     if (dataInThisPage.hasNext()) {
-      resultData.putTime(0);
-      resultData.putLong(dataInThisPage.currentTime());
+      resultData.setTimestamp(0);
+      resultData.setLongRet(dataInThisPage.currentTime());
+    }
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader,
+      long bound) throws IOException, ProcessorException {
+    if (resultData.isSetValue()) {
+      return;
+    }
+
+    if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        if (dataInThisPage.currentTime() >= bound) {
+          return;
+        }
+        resultData.setTimestamp(0);
+        resultData.setLongRet(dataInThisPage.currentTime());
+      } else {
+        if (unsequenceReader.current().getTimestamp() >= bound) {
+          return;
+        }
+        resultData.setTimestamp(0);
+        resultData.setLongRet(unsequenceReader.current().getTimestamp());
+      }
+      return;
+    }
+
+    if (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+      resultData.setTimestamp(0);
+      resultData.setLongRet(dataInThisPage.currentTime());
+      dataInThisPage.next();
     }
   }
 
   @Override
   public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
       throws IOException, ProcessorException {
-    if (resultData.length() > 0) {
+    if (resultData.isSetValue()) {
       return;
     }
     if (unsequenceReader.hasNext()) {
-      resultData.putTime(0);
-      resultData.putLong(unsequenceReader.current().getTimestamp());
+      resultData.setTimestamp(0);
+      resultData.setLongRet(unsequenceReader.current().getTimestamp());
     }
   }
 
   @Override
   public void calculateValueFromUnsequenceReader(IPointReader 
unsequenceReader, long bound)
       throws IOException, ProcessorException {
-    if (resultData.length() > 0) {
+    if (resultData.isSetValue()) {
       return;
     }
     if (unsequenceReader.hasNext() && 
unsequenceReader.current().getTimestamp() < bound) {
-      resultData.putTime(0);
-      resultData.putLong(unsequenceReader.current().getTimestamp());
+      resultData.setTimestamp(0);
+      resultData.setLongRet(unsequenceReader.current().getTimestamp());
     }
   }
 
   @Override
   public void calcAggregationUsingTimestamps(List<Long> timestamps,
-      EngineReaderByTimeStamp dataReader) throws IOException, 
ProcessorException {
-    if (resultData.length() > 0) {
+      EngineReaderByTimeStamp dataReader) throws IOException {
+    if (resultData.isSetValue()) {
       return;
     }
     for (long time : timestamps) {
       TsPrimitiveType value = dataReader.getValueInTimestamp(time);
       if (value != null) {
-        resultData.putTime(0);
-        resultData.putLong(time);
+        resultData.setTimestamp(0);
+        resultData.setLongRet(time);
         return;
       }
     }
@@ -123,13 +154,7 @@ public class MinTimeAggrFunc extends AggregateFunction {
 
   @Override
   public boolean isCalculatedAggregationResult() {
-    return resultData.length() > 0;
-  }
-
-  @Override
-  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
-      long intervalEnd, BatchData data) throws ProcessorException {
-
+    return resultData.isSetValue();
   }
 
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
index ed030f1..3184c33 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl;
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.AggregationConstant;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -39,30 +40,23 @@ public class MinValueAggrFunc extends AggregateFunction {
 
   @Override
   public void init() {
-
+    resultData.reSet();
   }
 
   @Override
-  public BatchData getResult() {
+  public AggreResultData getResult() {
     return resultData;
   }
 
   @Override
   public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
     Comparable<Object> minVal = (Comparable<Object>) 
pageHeader.getStatistics().getMin();
-    if (resultData.length() == 0) {
-      resultData.putTime(0);
-      resultData.putAnObject(minVal);
-    } else {
-      if (minVal.compareTo(resultData.currentValue()) < 0) {
-        resultData.setAnObject(0, minVal);
-      }
-    }
+    updateResult(minVal);
   }
 
   @Override
   public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
-      throws IOException, ProcessorException {
+      throws IOException {
     Comparable<Object> minVal = null;
     while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
       if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
@@ -97,8 +91,53 @@ public class MinValueAggrFunc extends AggregateFunction {
   }
 
   @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader,
+      long bound) throws IOException, ProcessorException {
+    Comparable<Object> minVal = null;
+    while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        if (dataInThisPage.currentTime() >= bound) {
+          break;
+        }
+        if (minVal == null || minVal.compareTo(dataInThisPage.currentValue()) 
> 0) {
+          minVal = (Comparable<Object>) dataInThisPage.currentValue();
+        }
+        dataInThisPage.next();
+      } else if (dataInThisPage.currentTime() == 
unsequenceReader.current().getTimestamp()) {
+        if (dataInThisPage.currentTime() >= bound) {
+          break;
+        }
+        if (minVal == null
+            || 
minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) {
+          minVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+        }
+        dataInThisPage.next();
+        unsequenceReader.next();
+      } else {
+        if (unsequenceReader.current().getTimestamp() >= bound) {
+          break;
+        }
+        if (minVal == null
+            || 
minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) {
+          minVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+        }
+        unsequenceReader.next();
+      }
+    }
+
+    while (dataInThisPage.hasNext() && dataInThisPage.currentTime() < bound) {
+      if (minVal == null
+          || minVal.compareTo(dataInThisPage.currentValue()) > 0) {
+        minVal = (Comparable<Object>) dataInThisPage.currentValue();
+      }
+      dataInThisPage.next();
+    }
+    updateResult(minVal);
+  }
+
+  @Override
   public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
-      throws IOException, ProcessorException {
+      throws IOException {
     Comparable<Object> minVal = null;
     while (unsequenceReader.hasNext()) {
       if (minVal == null
@@ -112,7 +151,7 @@ public class MinValueAggrFunc extends AggregateFunction {
 
   @Override
   public void calculateValueFromUnsequenceReader(IPointReader 
unsequenceReader, long bound)
-      throws IOException, ProcessorException {
+      throws IOException {
     Comparable<Object> minVal = null;
     while (unsequenceReader.hasNext() && 
unsequenceReader.current().getTimestamp() < bound) {
       if (minVal == null
@@ -126,7 +165,7 @@ public class MinValueAggrFunc extends AggregateFunction {
 
   @Override
   public void calcAggregationUsingTimestamps(List<Long> timestamps,
-      EngineReaderByTimeStamp dataReader) throws IOException, 
ProcessorException {
+      EngineReaderByTimeStamp dataReader) throws IOException {
     Comparable<Object> minVal = null;
     for (long time : timestamps) {
       TsPrimitiveType value = dataReader.getValueInTimestamp(time);
@@ -146,22 +185,12 @@ public class MinValueAggrFunc extends AggregateFunction {
   }
 
   private void updateResult(Comparable<Object> minVal) {
-    if (resultData.length() == 0) {
-      if (minVal != null) {
-        resultData.putTime(0);
-        resultData.putAnObject(minVal);
-      }
-    } else {
-      if (minVal != null && minVal.compareTo(resultData.currentValue()) < 0) {
-        resultData.setAnObject(0, minVal);
-      }
+    if (minVal == null) {
+      return;
+    }
+    if (!resultData.isSetValue() || minVal.compareTo(resultData.getValue()) < 
0) {
+      resultData.putTimeAndValue(0, minVal);
     }
-  }
-
-  @Override
-  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
-      long intervalEnd, BatchData data) throws ProcessorException {
-
   }
 
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
index 2f2eb2d..e24641e 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.query.aggregation.impl;
 
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 
 public class SumAggrFunc extends MeanAggrFunc {
 
@@ -29,9 +29,9 @@ public class SumAggrFunc extends MeanAggrFunc {
   }
 
   @Override
-  public BatchData getResult() {
-    resultData.putDouble(sum);
-    resultData.putTime(0);
+  public AggreResultData getResult() {
+    resultData.setDoubleRet(sum);
+    resultData.setTimestamp(0);
     return resultData;
   }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/BatchDataPointReader.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
similarity index 78%
rename from 
iotdb/src/main/java/org/apache/iotdb/db/query/dataset/BatchDataPointReader.java
rename to 
iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
index 2fbf4a1..8af94d0 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/BatchDataPointReader.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggreResultDataPointReader.java
@@ -16,34 +16,34 @@
 package org.apache.iotdb.db.query.dataset;
 
 import java.io.IOException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.utils.TimeValuePairUtils;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 
-public class BatchDataPointReader implements IPointReader {
+public class AggreResultDataPointReader implements IPointReader {
 
-  private BatchData batchData;
+  private AggreResultData aggreResultData;
 
-  public BatchDataPointReader(BatchData batchData) {
-    this.batchData = batchData;
+  public AggreResultDataPointReader(AggreResultData aggreResultData) {
+    this.aggreResultData = aggreResultData;
   }
 
   @Override
   public boolean hasNext() throws IOException {
-    return batchData.hasNext();
+    return aggreResultData.isSetValue();
   }
 
   @Override
   public TimeValuePair next() throws IOException {
-    TimeValuePair timeValuePair = 
TimeValuePairUtils.getCurrentTimeValuePair(batchData);
-    batchData.next();
+    TimeValuePair timeValuePair = 
TimeValuePairUtils.getCurrentTimeValuePair(aggreResultData);
+    aggreResultData.reSet();
     return timeValuePair;
   }
 
   @Override
   public TimeValuePair current() throws IOException {
-    TimeValuePair timeValuePair = 
TimeValuePairUtils.getCurrentTimeValuePair(batchData);
+    TimeValuePair timeValuePair = 
TimeValuePairUtils.getCurrentTimeValuePair(aggreResultData);
     return timeValuePair;
   }
 
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index d5a36fc..8348433 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -29,20 +29,19 @@ import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.aggregation.AggreFuncFactory;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.db.query.aggregation.AggregateFunction;
 import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
 import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryDataSourceManager;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
-import org.apache.iotdb.db.query.dataset.BatchDataPointReader;
+import org.apache.iotdb.db.query.dataset.AggreResultDataPointReader;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
-import org.apache.iotdb.db.query.reader.AllDataReader;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
 import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
 import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
@@ -95,7 +94,7 @@ public class AggregateEngineExecutor {
     List<IPointReader> readersOfUnSequenceData = new ArrayList<>();
     List<AggregateFunction> aggregateFunctions = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
-      //construct AggregateFunction
+      // construct AggregateFunction
       TSDataType tsDataType = MManager.getInstance()
           .getSeriesType(selectedSeries.get(i).getFullPath());
       AggregateFunction function = 
AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
@@ -106,7 +105,7 @@ public class AggregateEngineExecutor {
           .getQueryDataSource(jobId, selectedSeries.get(i), context);
 
       // sequence reader for sealed tsfile, unsealed tsfile, memory
-      SequenceDataReader sequenceReader = null;
+      SequenceDataReader sequenceReader;
       if (function instanceof MaxTimeAggrFunc || function instanceof 
LastAggrFunc) {
         sequenceReader = new 
SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter,
             context, true);
@@ -121,18 +120,25 @@ public class AggregateEngineExecutor {
       readersOfSequenceData.add(sequenceReader);
       readersOfUnSequenceData.add(unSeqMergeReader);
     }
-
-    List<BatchData> batchDatas = new ArrayList<BatchData>();
+    List<AggreResultData> aggreResultDataList = new ArrayList<>();
     //TODO use multi-thread
     for (int i = 0; i < selectedSeries.size(); i++) {
-      BatchData batchData = 
aggregateWithOutTimeGenerator(aggregateFunctions.get(i),
+      AggreResultData aggreResultData = 
aggregateWithOutTimeGenerator(aggregateFunctions.get(i),
           readersOfSequenceData.get(i), readersOfUnSequenceData.get(i), 
timeFilter);
-      batchDatas.add(batchData);
+      aggreResultDataList.add(aggreResultData);
     }
-    return constructDataSet(batchDatas);
+    return constructDataSet(aggreResultDataList);
   }
 
-  private BatchData aggregateWithOutTimeGenerator(AggregateFunction function,
+  /**
+   * calculation aggregate result with only time filter or no filter for one 
series.
+   * @param function aggregate function
+   * @param sequenceReader sequence data reader
+   * @param unSequenceReader unsequence data reader
+   * @param filter time filter or null
+   * @return one series aggregate result data
+   */
+  private AggreResultData aggregateWithOutTimeGenerator(AggregateFunction 
function,
       SequenceDataReader sequenceReader, IPointReader unSequenceReader, Filter 
filter)
       throws IOException, ProcessorException {
     if (function instanceof MaxTimeAggrFunc || function instanceof 
LastAggrFunc) {
@@ -164,6 +170,9 @@ public class AggregateEngineExecutor {
     return function.getResult();
   }
 
+  /**
+   * determine whether pageHeader can be used to compute aggregation results.
+   */
   private boolean canUseHeader(AggregateFunction function, PageHeader 
pageHeader,
       IPointReader unSequenceReader, Filter filter)
       throws IOException, ProcessorException {
@@ -198,7 +207,7 @@ public class AggregateEngineExecutor {
    * @param unSequenceReader unsequence data reader
    * @return BatchData-aggregate result
    */
-  private BatchData handleLastMaxTimeWithOutTimeGenerator(AggregateFunction 
function,
+  private AggreResultData 
handleLastMaxTimeWithOutTimeGenerator(AggregateFunction function,
       SequenceDataReader sequenceReader, IPointReader unSequenceReader, Filter 
timeFilter)
       throws IOException, ProcessorException {
     long lastBatchTimeStamp = Long.MIN_VALUE;
@@ -252,8 +261,8 @@ public class AggregateEngineExecutor {
     QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId, 
expression);
 
     EngineTimeGenerator timestampGenerator = new EngineTimeGenerator(jobId, 
expression, context);
-    List<EngineReaderByTimeStamp> readersOfSelectedSeries = 
getReadersOfSelectedPaths(
-        selectedSeries, context);
+    List<EngineReaderByTimeStamp> readersOfSelectedSeries = SeriesReaderFactory
+        .getByTimestampReadersOfSelectedPaths(jobId, selectedSeries, context);
 
     List<AggregateFunction> aggregateFunctions = new ArrayList<>();
     for (int i = 0; i < selectedSeries.size(); i++) {
@@ -262,12 +271,17 @@ public class AggregateEngineExecutor {
       function.init();
       aggregateFunctions.add(function);
     }
-    List<BatchData> batchDatas = 
aggregateWithTimeGenerator(aggregateFunctions, timestampGenerator,
+    List<AggreResultData> batchDataList = 
aggregateWithTimeGenerator(aggregateFunctions,
+        timestampGenerator,
         readersOfSelectedSeries);
-    return constructDataSet(batchDatas);
+    return constructDataSet(batchDataList);
   }
 
-  private List<BatchData> aggregateWithTimeGenerator(List<AggregateFunction> 
aggregateFunctions,
+  /**
+   * calculation aggregate result with value filter.
+   */
+  private List<AggreResultData> aggregateWithTimeGenerator(
+      List<AggregateFunction> aggregateFunctions,
       EngineTimeGenerator timestampGenerator,
       List<EngineReaderByTimeStamp> readersOfSelectedSeries)
       throws IOException, ProcessorException {
@@ -288,59 +302,27 @@ public class AggregateEngineExecutor {
         aggregateFunctions.get(i)
             .calcAggregationUsingTimestamps(timestamps, 
readersOfSelectedSeries.get(i));
       }
-
     }
 
-    List<BatchData> batchDataList = new ArrayList<>();
+    List<AggreResultData> aggreResultDataArrayList = new ArrayList<>();
     for (AggregateFunction function : aggregateFunctions) {
-      batchDataList.add(function.getResult());
-    }
-    return batchDataList;
-  }
-
-  private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path> 
paths,
-      QueryContext context)
-      throws IOException, FileNodeManagerException {
-
-    List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
-
-    for (Path path : paths) {
-
-      QueryDataSource queryDataSource = 
QueryDataSourceManager.getQueryDataSource(jobId, path,
-          context);
-
-      PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new 
PriorityMergeReaderByTimestamp();
-
-      // reader for sequence data
-      SequenceDataReader tsFilesReader = new 
SequenceDataReader(queryDataSource.getSeqDataSource(),
-          null, context);
-
-      // reader for unSequence data
-      PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
-          
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
-
-      if (tsFilesReader == null || !tsFilesReader.hasNext()) {
-        mergeReaderByTimestamp
-            .addReaderWithPriority(unSeqMergeReader, 
PriorityMergeReader.HIGH_PRIORITY);
-      } else {
-        mergeReaderByTimestamp
-            .addReaderWithPriority(new AllDataReader(tsFilesReader, 
unSeqMergeReader),
-                PriorityMergeReader.HIGH_PRIORITY);
-      }
-
-      readersOfSelectedSeries.add(mergeReaderByTimestamp);
+      aggreResultDataArrayList.add(function.getResult());
     }
-
-    return readersOfSelectedSeries;
+    return aggreResultDataArrayList;
   }
 
-  private QueryDataSet constructDataSet(List<BatchData> batchDataList) throws 
IOException {
+  /**
+   * using aggregate result data list construct QueryDataSet.
+   * @param aggreResultDataList aggregate result data list
+   */
+  private QueryDataSet constructDataSet(List<AggreResultData> 
aggreResultDataList)
+      throws IOException {
     List<TSDataType> dataTypes = new ArrayList<>();
-    List<IPointReader> batchDataPointReaders = new ArrayList<>();
-    for (BatchData batchData : batchDataList) {
-      dataTypes.add(batchData.getDataType());
-      batchDataPointReaders.add(new BatchDataPointReader(batchData));
+    List<IPointReader> resultDataPointReaders = new ArrayList<>();
+    for (AggreResultData resultData : aggreResultDataList) {
+      dataTypes.add(resultData.getDataType());
+      resultDataPointReaders.add(new AggreResultDataPointReader(resultData));
     }
-    return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, 
batchDataPointReaders);
+    return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypes, 
resultDataPointReaders);
   }
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
index d5b1740..f6bebfd 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
@@ -22,20 +22,14 @@ package org.apache.iotdb.db.query.executor;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryDataSourceManager;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
-import org.apache.iotdb.db.query.reader.AllDataReader;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
-import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
 import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -73,7 +67,7 @@ public class EngineExecutorWithTimeGenerator {
     List<EngineReaderByTimeStamp> readersOfSelectedSeries;
     try {
       timestampGenerator = new EngineTimeGenerator(jobId, 
queryExpression.getExpression(), context);
-      readersOfSelectedSeries = 
getReadersOfSelectedPaths(queryExpression.getSelectedSeries(),
+      readersOfSelectedSeries = 
SeriesReaderFactory.getByTimestampReadersOfSelectedPaths(jobId, 
queryExpression.getSelectedSeries(),
           context);
     } catch (IOException ex) {
       throw new FileNodeManagerException(ex);
@@ -94,40 +88,4 @@ public class EngineExecutorWithTimeGenerator {
         readersOfSelectedSeries);
   }
 
-  private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path> 
paths,
-      QueryContext context)
-      throws IOException, FileNodeManagerException {
-
-    List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
-
-    for (Path path : paths) {
-
-      QueryDataSource queryDataSource = 
QueryDataSourceManager.getQueryDataSource(jobId, path,
-          context);
-
-      PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new 
PriorityMergeReaderByTimestamp();
-
-      // reader for sequence data
-      SequenceDataReader tsFilesReader = new 
SequenceDataReader(queryDataSource.getSeqDataSource(),
-          null, context);
-
-      // reader for unSequence data
-      PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
-          
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
-
-      if (tsFilesReader == null || !tsFilesReader.hasNext()) {
-        mergeReaderByTimestamp
-            .addReaderWithPriority(unSeqMergeReader, 
PriorityMergeReader.HIGH_PRIORITY);
-      } else {
-        mergeReaderByTimestamp
-            .addReaderWithPriority(new AllDataReader(tsFilesReader, 
unSeqMergeReader),
-                PriorityMergeReader.HIGH_PRIORITY);
-      }
-
-      readersOfSelectedSeries.add(mergeReaderByTimestamp);
-    }
-
-    return readersOfSelectedSeries;
-  }
-
 }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index c36d5c2..d7f6cf0 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -19,9 +19,10 @@
 
 package org.apache.iotdb.db.query.executor;
 
-import static 
org.apache.iotdb.tsfile.read.expression.ExpressionType.GLOBAL_TIME;
-
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
@@ -32,10 +33,15 @@ import 
org.apache.iotdb.db.query.control.OpenedFilePathsManager;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 /**
  * Query entrance class of IoTDB query process. All query clause will be 
transformed to physical
@@ -68,10 +74,9 @@ public class EngineQueryRouter {
             .optimize(queryExpression.getExpression(), 
queryExpression.getSelectedSeries());
         queryExpression.setExpression(optimizedExpression);
 
-        if (optimizedExpression.getType() == GLOBAL_TIME) {
+        if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
           EngineExecutorWithoutTimeGenerator engineExecutor =
               new EngineExecutorWithoutTimeGenerator(
-
                   nextJobId, queryExpression);
           return engineExecutor.executeWithGlobalTimeFilter(context);
         } else {
@@ -96,8 +101,8 @@ public class EngineQueryRouter {
    * execute aggregation query.
    */
   public QueryDataSet aggregate(List<Path> selectedSeries, List<String> aggres,
-      IExpression expression)
-      throws QueryFilterOptimizationException, FileNodeManagerException, 
IOException, PathErrorException, ProcessorException {
+      IExpression expression) throws QueryFilterOptimizationException, 
FileNodeManagerException,
+      IOException, PathErrorException, ProcessorException {
 
     long nextJobId = getNextJobId();
     QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
@@ -110,7 +115,7 @@ public class EngineQueryRouter {
           .optimize(expression, selectedSeries);
       AggregateEngineExecutor engineExecutor = new 
AggregateEngineExecutor(nextJobId,
           selectedSeries, aggres, optimizedExpression);
-      if (optimizedExpression.getType() == GLOBAL_TIME) {
+      if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
         return engineExecutor.executeWithOutTimeGenerator(context);
       } else {
         return engineExecutor.executeWithTimeGenerator(context);
@@ -122,6 +127,111 @@ public class EngineQueryRouter {
     }
   }
 
+  /**
+   * execute groupBy query.
+   * @param selectedSeries select path list
+   * @param aggres aggregation name list
+   * @param expression filter expression
+   * @param unit time granularity for interval partitioning, unit is ms.
+   * @param origin the datum time point for interval division is divided into 
a time interval
+   *        for each TimeUnit time from this point forward and backward.
+   * @param intervals time intervals, closed interval.
+   * @return
+   * @throws ProcessorException
+   * @throws QueryFilterOptimizationException
+   * @throws FileNodeManagerException
+   * @throws PathErrorException
+   * @throws IOException
+   */
+  public QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
+      IExpression expression, long unit, long origin, List<Pair<Long, Long>> 
intervals)
+      throws ProcessorException, QueryFilterOptimizationException, 
FileNodeManagerException,
+      PathErrorException, IOException {
+
+    long nextJobId = getNextJobId();
+    QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+    
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+    QueryContext context = new QueryContext();
+
+    //check the legitimacy of intervals
+    for (Pair<Long, Long> pair : intervals) {
+      if (!(pair.left > 0 && pair.right > 0)) {
+        throw new ProcessorException(
+            String.format("Time interval<%d, %d> must be greater than 0.", 
pair.left, pair.right));
+      }
+      if (pair.right < pair.left) {
+        throw new ProcessorException(String.format(
+            "Interval starting time must be greater than the interval ending 
time, "
+                + "found error interval<%d, %d>", pair.left, pair.right));
+      }
+    }
+    //merge intervals
+    List<Pair<Long, Long>> mergedIntervalList = mergeInterval(intervals);
+
+    //construct groupBy intervals filter
+    BinaryExpression intervalFilter = null;
+    for (Pair<Long, Long> pair : mergedIntervalList) {
+      BinaryExpression pairFilter = BinaryExpression
+          .and(new GlobalTimeExpression(TimeFilter.gtEq(pair.left)),
+              new GlobalTimeExpression(TimeFilter.ltEq(pair.right)));
+      if (intervalFilter != null) {
+        intervalFilter = BinaryExpression.or(intervalFilter, pairFilter);
+      } else {
+        intervalFilter = pairFilter;
+      }
+    }
+
+    //merge interval filter and filtering conditions after where statements
+    if (expression == null) {
+      expression = intervalFilter;
+    } else {
+      expression = BinaryExpression.and(expression, intervalFilter);
+    }
+
+    IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+        .optimize(expression, selectedSeries);
+    if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+      GroupByWithOnlyTimeFilterDataSet groupByEngine = new 
GroupByWithOnlyTimeFilterDataSet(
+          nextJobId, selectedSeries, unit, origin, mergedIntervalList);
+      groupByEngine.initGroupBy(context, aggres, optimizedExpression);
+      return groupByEngine;
+    } else {
+      GroupByWithValueFilterDataSet groupByEngine = new 
GroupByWithValueFilterDataSet(nextJobId,
+          selectedSeries, unit, origin, mergedIntervalList);
+      groupByEngine.initGroupBy(context, aggres, optimizedExpression);
+      return groupByEngine;
+    }
+  }
+
+  /**
+   * sort intervals by start time and merge overlapping intervals.
+   *
+   * @param intervals time interval
+   */
+  private List<Pair<Long, Long>> mergeInterval(List<Pair<Long, Long>> 
intervals) {
+    Collections.sort(intervals, new Comparator<Pair<Long, Long>>() {
+      @Override
+      public int compare(Pair<Long, Long> o1, Pair<Long, Long> o2) {
+        /*sort by interval start time.*/
+        return (int) (o1.left - o2.left);
+      }
+    });
+
+    LinkedList<Pair<Long, Long>> merged = new LinkedList<>();
+    for (Pair<Long, Long> interval : intervals) {
+      // if the list of merged intervals is empty or
+      // if the current interval does not overlap with the previous, simply 
append it.
+      if (merged.isEmpty() || merged.getLast().right < interval.left) {
+        merged.add(interval);
+      }
+      // otherwise, there is overlap, so we merge the current and previous 
intervals.
+      else {
+        merged.getLast().right = Math.max(merged.getLast().right, 
interval.right);
+      }
+    }
+    return merged;
+  }
+
   private synchronized long getNextJobId() {
     return jobIdGenerator.incrementAndGet();
   }
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
new file mode 100644
index 0000000..5430f9e
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByEngine.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.aggregation.AggreFuncFactory;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public abstract class GroupByEngine extends QueryDataSet {
+
+  protected long jobId;
+  protected List<Path> selectedSeries;
+  private long unit;
+  private long origin;
+  protected List<Pair<Long, Long>> mergedIntervals;
+
+  protected long startTime;
+  protected long endTime;
+  protected int usedIndex;
+  protected List<AggregateFunction> functions;
+  protected boolean hasCachedTimeInterval;
+
+  /**
+   * groupBy query.
+   */
+  public GroupByEngine(long jobId, List<Path> paths, long unit, long origin,
+      List<Pair<Long, Long>> mergedIntervals) {
+    super(paths);
+    this.jobId = jobId;
+    this.selectedSeries = paths;
+    this.unit = unit;
+    this.origin = origin;
+    this.mergedIntervals = mergedIntervals;
+
+    this.functions = new ArrayList<>();
+
+    //init group by time partition
+    this.usedIndex = 0;
+    this.hasCachedTimeInterval = false;
+    this.endTime = -1;
+  }
+
+  protected void initAggreFuction(QueryContext context, List<String> aggres, 
IExpression expression)
+      throws FileNodeManagerException, PathErrorException, ProcessorException, 
IOException {
+
+    List<TSDataType> types = new ArrayList<>();
+    //construct AggregateFunctions
+    for (int i = 0; i < paths.size(); i++) {
+      TSDataType tsDataType = MManager.getInstance()
+          .getSeriesType(selectedSeries.get(i).getFullPath());
+      AggregateFunction function = 
AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
+      function.init();
+      functions.add(function);
+      types.add(function.getResultDataType());
+    }
+    super.setDataTypes(types);
+  }
+
+  @Override
+  public boolean hasNext() {
+    //has cached
+    if (hasCachedTimeInterval) {
+      return true;
+    }
+
+    //end
+    if (usedIndex >= mergedIntervals.size()) {
+      return false;
+    }
+
+    //skip the intervals in coverage of last time-partition
+    while (usedIndex < mergedIntervals.size() && 
mergedIntervals.get(usedIndex).right < endTime) {
+      usedIndex++;
+    }
+    if (usedIndex >= mergedIntervals.size()) {
+      return false;
+    }
+
+    //initialize the start-end time of next interval
+    if (endTime < mergedIntervals.get(usedIndex).left) {
+      //interval start time
+      startTime = mergedIntervals.get(usedIndex).left;
+      if (origin > startTime) {
+        endTime = origin - (origin - startTime) / unit * unit;
+      } else {
+        endTime = origin + (startTime - origin) / unit * unit + unit;
+      }
+      hasCachedTimeInterval = true;
+      return true;
+    }
+
+    //current interval is not covered yet
+    if (endTime < mergedIntervals.get(usedIndex).right) {
+      startTime = endTime;
+      endTime += unit;
+      hasCachedTimeInterval = true;
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * this method is only used in the test class to get the next time partition.
+   */
+  public Pair<Long, Long> nextTimePartition() {
+    hasCachedTimeInterval = false;
+    return new Pair<>(startTime, endTime);
+  }
+
+  protected Field getField(AggreResultData aggreResultData) {
+    if (!aggreResultData.isSetValue()) {
+      return new Field(null);
+    }
+    Field field = new Field(aggreResultData.getDataType());
+    switch (aggreResultData.getDataType()) {
+      case INT32:
+        field.setIntV(aggreResultData.getIntRet());
+        break;
+      case INT64:
+        field.setLongV(aggreResultData.getLongRet());
+        break;
+      case FLOAT:
+        field.setFloatV(aggreResultData.getFloatRet());
+        break;
+      case DOUBLE:
+        field.setDoubleV(aggreResultData.getDoubleRet());
+        break;
+      case BOOLEAN:
+        field.setBoolV(aggreResultData.isBooleanRet());
+        break;
+      case TEXT:
+        field.setBinaryV(aggreResultData.getBinaryRet());
+        break;
+      default:
+        throw new UnSupportedDataTypeException("UnSupported: " + 
aggreResultData.getDataType());
+    }
+    return field;
+  }
+
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
new file mode 100644
index 0000000..67d72d9
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithOnlyTimeFilterDataSet.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryDataSourceManager;
+import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.IAggregateReader;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class GroupByWithOnlyTimeFilterDataSet extends GroupByEngine {
+
+  protected List<IPointReader> unSequenceReaderList;
+  protected List<IAggregateReader> sequenceReaderList;
+  private List<BatchData> batchDataList;
+  private List<Boolean> hasCachedSequenceDataList;
+  private Filter timeFilter;
+
+  /**
+   * constructor.
+   */
+  public GroupByWithOnlyTimeFilterDataSet(long jobId, List<Path> paths, long 
unit, long origin,
+      List<Pair<Long, Long>> mergedIntervals) {
+    super(jobId, paths, unit, origin, mergedIntervals);
+    this.unSequenceReaderList = new ArrayList<>();
+    this.sequenceReaderList = new ArrayList<>();
+    this.timeFilter = null;
+    this.hasCachedSequenceDataList = new ArrayList<>();
+    this.batchDataList = new ArrayList<>();
+    for (int i = 0; i < paths.size(); i++) {
+      hasCachedSequenceDataList.add(false);
+      batchDataList.add(null);
+    }
+  }
+
+  /**
+   * init reader and aggregate function.
+   */
+  public void initGroupBy(QueryContext context, List<String> aggres, 
IExpression expression)
+      throws FileNodeManagerException, PathErrorException, ProcessorException, 
IOException {
+    initAggreFuction(context, aggres, expression);
+    //init reader
+    QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
+    if (expression != null) {
+      timeFilter = ((GlobalTimeExpression) expression).getFilter();
+    }
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      QueryDataSource queryDataSource = QueryDataSourceManager
+          .getQueryDataSource(jobId, selectedSeries.get(i), context);
+
+      // sequence reader for sealed tsfile, unsealed tsfile, memory
+      SequenceDataReader sequenceReader = new 
SequenceDataReader(queryDataSource.getSeqDataSource(),
+          timeFilter, context, false);
+
+      // unseq reader for all chunk groups in unSeqFile, memory
+      PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+          
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), 
timeFilter);
+
+      sequenceReaderList.add(sequenceReader);
+      unSequenceReaderList.add(unSeqMergeReader);
+    }
+
+  }
+
+  @Override
+  public RowRecord next() throws IOException {
+    if (!hasCachedTimeInterval) {
+      throw new IOException(
+          "need to call hasNext() before calling next() in 
GroupByWithOnlyTimeFilterDataSet.");
+    }
+    hasCachedTimeInterval = false;
+    RowRecord record = new RowRecord(startTime);
+    for (int i = 0; i < functions.size(); i++) {
+      AggreResultData res = null;
+      try {
+        res = nextSeries(i);
+      } catch (ProcessorException e) {
+        throw new IOException(e);
+      }
+      if (res == null) {
+        record.addField(new Field(null));
+      } else {
+        record.addField(getField(res));
+      }
+    }
+    return record;
+  }
+
+  protected AggreResultData nextSeries(int idx) throws IOException, 
ProcessorException {
+    IPointReader unsequenceReader = unSequenceReaderList.get(idx);
+    IAggregateReader sequenceReader = sequenceReaderList.get(idx);
+    AggregateFunction function = functions.get(idx);
+    BatchData batchData = batchDataList.get(idx);
+    boolean hasCachedSequenceData = hasCachedSequenceDataList.get(idx);
+
+    function.init();
+
+    boolean finishCheckSequenceData = false;
+
+    //skip the points with timestamp less than startTime
+    skipExessData(idx, sequenceReader, unsequenceReader);
+
+    //there was unprocessed data in last batch
+    if (hasCachedSequenceData && batchData.hasNext()) {
+      function.calculateValueFromPageData(batchData, unsequenceReader, 
endTime);
+    }
+
+    if (hasCachedSequenceData && batchData.hasNext()) {
+      finishCheckSequenceData = true;
+    } else {
+      hasCachedSequenceData = false;
+    }
+
+    if (finishCheckSequenceData) {
+      //check unsequence data
+      function.calculateValueFromUnsequenceReader(unsequenceReader, endTime);
+      return function.getResult().deepCopy();
+    }
+
+    //continue checking sequence data
+    while (sequenceReader.hasNext()) {
+      PageHeader pageHeader = sequenceReader.nextPageHeader();
+
+      //memory data
+      if (pageHeader == null) {
+        batchData = sequenceReader.nextBatch();
+        function.calculateValueFromPageData(batchData, unsequenceReader, 
endTime);
+        //no point in sequence data with a timestamp less than endTime
+        if (batchData.hasNext()) {
+          hasCachedSequenceData = true;
+          break;
+        }
+      }
+
+      //page data
+      long minTime = pageHeader.getMinTimestamp();
+      long maxTime = pageHeader.getMaxTimestamp();
+      //no point in sequence data with a timestamp less than endTime
+      if (minTime >= endTime) {
+        hasCachedSequenceData = true;
+        batchData = sequenceReader.nextBatch();
+        break;
+      }
+
+      if (canUseHeader(minTime, maxTime, unsequenceReader, function)) {
+        //cal using page header
+        function.calculateValueFromPageHeader(pageHeader);
+      } else {
+        //cal using page data
+        batchData = sequenceReader.nextBatch();
+        function.calculateValueFromPageData(batchData, unsequenceReader, 
endTime);
+        if (batchData.hasNext()) {
+          hasCachedSequenceData = true;
+          break;
+        }
+      }
+
+    }
+
+    function.calculateValueFromUnsequenceReader(unsequenceReader, endTime);
+    hasCachedSequenceDataList.set(idx, hasCachedSequenceData);
+    batchDataList.set(idx, batchData);
+    return function.getResult().deepCopy();
+  }
+
+  //skip the points with timestamp less than startTime
+  private void skipExessData(int idx, IAggregateReader sequenceReader, 
IPointReader unsequenceReader)
+      throws IOException {
+    BatchData batchData = batchDataList.get(idx);
+    boolean hasCachedSequenceData = hasCachedSequenceDataList.get(idx);
+
+    //skip the unsequenceReader points with timestamp less than startTime
+    while (unsequenceReader.hasNext() && 
unsequenceReader.current().getTimestamp() < startTime){
+      unsequenceReader.next();
+    }
+
+    //skip the cached batch data points with timestamp less than startTime
+    if(hasCachedSequenceData){
+      while (batchData.hasNext() && batchData.currentTime() < startTime){
+        batchData.next();
+      }
+    }
+    if(hasCachedSequenceData && !batchData.hasNext()){
+      hasCachedSequenceData = false;
+    }
+    else {
+      return;
+    }
+
+    //skip the points in sequenceReader data whose timestamp are less than 
startTime
+    while (sequenceReader.hasNext()){
+      PageHeader pageHeader = sequenceReader.nextPageHeader();
+      //memory data
+      if(pageHeader == null){
+        batchData = sequenceReader.nextBatch();
+        hasCachedSequenceData = true;
+        while (batchData.hasNext() && batchData.currentTime() < startTime){
+          batchData.next();
+        }
+        continue;
+      }
+      //timestamps of all points in the page are less than startTime
+      if(pageHeader.getMaxTimestamp() < startTime){
+        sequenceReader.skipPageData();
+        continue;
+      }
+      //timestamps of all points in the page are greater or equal to 
startTime, don't need to skip
+      if(pageHeader.getMinTimestamp() >= startTime){
+        break;
+      }
+      //the page has overlap with startTime
+      batchData = sequenceReader.nextBatch();
+      hasCachedSequenceData = true;
+      while (batchData.hasNext() && batchData.currentTime() < startTime){
+        batchData.next();
+      }
+      break;
+    }
+
+    batchDataList.set(idx, batchData);
+    hasCachedSequenceDataList.set(idx, hasCachedSequenceData);
+  }
+
+  private boolean canUseHeader(long minTime, long maxTime, IPointReader 
unSequenceReader,
+      AggregateFunction function)
+      throws IOException, ProcessorException {
+    if (timeFilter != null && !timeFilter.containStartEndTime(minTime, 
maxTime)) {
+      return false;
+    }
+
+    //cal unsequence data with timestamps between pages.
+    function.calculateValueFromUnsequenceReader(unSequenceReader, minTime);
+
+    if (unSequenceReader.hasNext() && 
unSequenceReader.current().getTimestamp() <= maxTime) {
+      return false;
+    }
+    return true;
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
new file mode 100644
index 0000000..077905d
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/GroupByWithValueFilterDataSet.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class GroupByWithValueFilterDataSet extends GroupByEngine {
+
+
+  private List<EngineReaderByTimeStamp> allDataReaderList;
+  private TimeGenerator timestampGenerator;
+  private long timestamp;
+  private boolean hasCachedTimestamp;
+
+  //group by batch calculation size.
+  private int timeStampFetchSize;
+
+  /**
+   * constructor.
+   */
+  public GroupByWithValueFilterDataSet(long jobId, List<Path> paths, long 
unit, long origin,
+      List<Pair<Long, Long>> mergedIntervals) {
+    super(jobId, paths, unit, origin, mergedIntervals);
+    this.allDataReaderList = new ArrayList<>();
+    this.timeStampFetchSize = 10 * 
IoTDBDescriptor.getInstance().getConfig().getFetchSize();
+  }
+
+  /**
+   * init reader and aggregate function.
+   */
+  public void initGroupBy(QueryContext context, List<String> aggres, 
IExpression expression)
+      throws FileNodeManagerException, PathErrorException, ProcessorException, 
IOException {
+    initAggreFuction(context, aggres, expression);
+
+    QueryTokenManager.getInstance().beginQueryOfGivenExpression(jobId, 
expression);
+    QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
+    this.timestampGenerator = new EngineTimeGenerator(jobId, expression, 
context);
+    this.allDataReaderList = SeriesReaderFactory
+        .getByTimestampReadersOfSelectedPaths(jobId, selectedSeries, context);
+  }
+
+  @Override
+  public RowRecord next() throws IOException {
+    if (!hasCachedTimeInterval) {
+      throw new IOException(
+          "need to call hasNext() before calling next() in 
GroupByWithOnlyTimeFilterDataSet.");
+    }
+    hasCachedTimeInterval = false;
+    for (AggregateFunction function : functions) {
+      function.init();
+    }
+
+    List<Long> timestampList = new ArrayList<>(timeStampFetchSize);
+    if (hasCachedTimestamp) {
+      if (timestamp < endTime) {
+        hasCachedTimestamp = false;
+        timestampList.add(timestamp);
+      } else {
+        //所有域均为空
+        return constructRowRecord();
+      }
+    }
+
+    while (timestampGenerator.hasNext()) {
+      //construct timestamp list
+      for (int cnt = 1; cnt < timeStampFetchSize; cnt++) {
+        if (!timestampGenerator.hasNext()) {
+          break;
+        }
+        timestamp = timestampGenerator.next();
+        if (timestamp < endTime) {
+          timestampList.add(timestamp);
+        } else {
+          hasCachedTimestamp = true;
+          break;
+        }
+      }
+
+      //cal result using timestamp list
+      for (int i = 0; i < selectedSeries.size(); i++) {
+        functions.get(i).calcAggregationUsingTimestamps(timestampList, 
allDataReaderList.get(i));
+      }
+
+      timestampList.clear();
+      //judge if it's end
+      if (timestamp >= endTime) {
+        hasCachedTimestamp = true;
+        break;
+      }
+    }
+
+    if(!timestampList.isEmpty()){
+      //cal result using timestamp list
+      for (int i = 0; i < selectedSeries.size(); i++) {
+        functions.get(i).calcAggregationUsingTimestamps(timestampList, 
allDataReaderList.get(i));
+      }
+    }
+    return constructRowRecord();
+  }
+
+  private RowRecord constructRowRecord() {
+    RowRecord record = new RowRecord(startTime);
+    for (int i = 0; i < functions.size(); i++) {
+      record.addField(getField(functions.get(i).getResult()));
+    }
+    return record;
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 8a959da..9ad6d52 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -20,29 +20,38 @@
 package org.apache.iotdb.db.query.factory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.engine.filenode.TsFileResource;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.control.QueryDataSourceManager;
 import org.apache.iotdb.db.query.reader.AllDataReader;
 import org.apache.iotdb.db.query.reader.IBatchReader;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.IReader;
 import org.apache.iotdb.db.query.reader.mem.MemChunkReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
 import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.sequence.SealedTsFilesReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
 import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
 import org.apache.iotdb.tsfile.read.controller.MetadataQuerier;
 import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -175,6 +184,41 @@ public class SeriesReaderFactory {
     return new SealedTsFilesReader(seriesInTsFileReader, context);
   }
 
+  public static List<EngineReaderByTimeStamp> 
getByTimestampReadersOfSelectedPaths(long jobId , List<Path> paths,
+      QueryContext context) throws IOException, FileNodeManagerException {
+
+    List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
+
+    for (Path path : paths) {
+
+      QueryDataSource queryDataSource = 
QueryDataSourceManager.getQueryDataSource(jobId, path,
+          context);
+
+      PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new 
PriorityMergeReaderByTimestamp();
+
+      // reader for sequence data
+      SequenceDataReader tsFilesReader = new 
SequenceDataReader(queryDataSource.getSeqDataSource(),
+          null, context);
+
+      // reader for unSequence data
+      PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+          
.createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
+
+      if (tsFilesReader == null || !tsFilesReader.hasNext()) {
+        mergeReaderByTimestamp
+            .addReaderWithPriority(unSeqMergeReader, 
PriorityMergeReader.HIGH_PRIORITY);
+      } else {
+        mergeReaderByTimestamp
+            .addReaderWithPriority(new AllDataReader(tsFilesReader, 
unSeqMergeReader),
+                PriorityMergeReader.HIGH_PRIORITY);
+      }
+
+      readersOfSelectedSeries.add(mergeReaderByTimestamp);
+    }
+
+    return readersOfSelectedSeries;
+  }
+
   private static class SeriesReaderFactoryHelper {
 
     private static final SeriesReaderFactory INSTANCE = new 
SeriesReaderFactory();
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java 
b/iotdb/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java
index 241222a..33a5e4d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.db.query.aggregation.AggreResultData;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
@@ -51,4 +52,30 @@ public class TimeValuePairUtils {
         throw new 
UnSupportedDataTypeException(String.valueOf(data.getDataType()));
     }
   }
+
+  /**
+   * get given data's current (time,value) pair.
+   *
+   * @param data -AggreResultData
+   * @return -given data's (time,value) pair
+   */
+  public static TimeValuePair getCurrentTimeValuePair(AggreResultData data) {
+    switch (data.getDataType()) {
+      case INT32:
+
+        return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsInt(data.getIntRet()));
+      case INT64:
+        return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsLong(data.getLongRet()));
+      case FLOAT:
+        return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsFloat(data.getFloatRet()));
+      case DOUBLE:
+        return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsDouble(data.getDoubleRet()));
+      case TEXT:
+        return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsBinary(data.getBinaryRet()));
+      case BOOLEAN:
+        return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsBoolean(data.isBooleanRet()));
+      default:
+        throw new 
UnSupportedDataTypeException(String.valueOf(data.getDataType()));
+    }
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java 
b/iotdb/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java
index ea43c28..d77704e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
 public abstract class TsPrimitiveType implements Serializable {
 
   /**
-   * get tsPrimitiveType by dataType.
+   * get tsPrimitiveType by resultDataType.
    *
    * @param dataType -given TsDataType
    * @param v -
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
new file mode 100644
index 0000000..dc2fd4c
--- /dev/null
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import static org.apache.iotdb.db.integration.Constant.count;
+import static org.apache.iotdb.db.integration.Constant.first;
+import static org.apache.iotdb.db.integration.Constant.last;
+import static org.apache.iotdb.db.integration.Constant.max_time;
+import static org.apache.iotdb.db.integration.Constant.max_value;
+import static org.apache.iotdb.db.integration.Constant.mean;
+import static org.apache.iotdb.db.integration.Constant.min_time;
+import static org.apache.iotdb.db.integration.Constant.min_value;
+import static org.apache.iotdb.db.integration.Constant.sum;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IOTDBGroupByTestIT {
+  private static IoTDB daemon;
+
+  private static String[] dataSet1 = new String[]{
+      "SET STORAGE GROUP TO root.ln.wf01.wt01",
+      "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE, 
ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, 
ENCODING=PLAIN",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(1, 1.1, false, 11)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(2, 2.2, true, 22)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(3, 3.3, false, 33 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(4, 4.4, false, 44)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(5, 5.5, false, 55)",
+      "flush",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(100, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(150, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(200, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(250, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(300, 500.5, false, 550)",
+      "flush",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(10, 10.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(20, 20.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(30, 30.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(40, 40.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(50, 50.5, false, 550)",
+      "flush",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(500, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(510, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(520, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(530, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(540, 500.5, false, 550)",
+      "flush",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(580, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(590, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(600, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(610, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(620, 500.5, false, 550)",
+  };
+
+  private String insertTemplate1 = "INSERT INTO 
root.vehicle.d0(timestamp,s0,s1,s2,s3"
+      + ") VALUES(%d,%d,%d,%f,%s)";
+
+  private static final String TIMESTAMP_STR = "Time";
+  private final String d0s0 = "root.vehicle.d0.s0";
+  private final String d0s1 = "root.vehicle.d0.s1";
+  private final String d0s2 = "root.vehicle.d0.s2";
+  private final String d0s3 = "root.vehicle.d0.s3";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.closeMemControl();
+    daemon = IoTDB.getInstance();
+    daemon.active();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    daemon.stop();
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void countSumMeanTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "2,1,4.4,4.4",
+        "5,3,35.8,11.933333333333332",
+        "25,1,30.3,30.3",
+        "50,1,50.5,50.5",
+        "65,0,0.0,null",
+        "85,1,100.1,100.1",
+        "105,0,0.0,null",
+        "125,0,0.0,null",
+        "145,1,200.2,200.2",
+        "310,0,0.0,null"
+    };
+    String[] retArray2 = new String[]{
+        "2,2,7.7,3.85",
+        "5,3,35.8,11.933333333333332",
+        "25,1,30.3,30.3",
+        "50,1,50.5,50.5",
+        "65,0,0.0,null",
+        "85,1,100.1,100.1",
+        "105,0,0.0,null",
+        "125,0,0.0,null",
+        "145,1,200.2,200.2",
+        "310,0,0.0,null"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select count(temperature), 
sum(temperature), mean(temperature) from root.ln.wf01.wt01 where time > 3 "
+          + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(count("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(sum("root.ln.wf01.wt01.temperature"))+ 
"," + resultSet.getString(mean("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select count(temperature), 
sum(temperature), mean(temperature) from root.ln.wf01.wt01 where temperature > 
3 "
+          + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(count("root.ln.wf01.wt01.temperature"))
+            + "," + resultSet.getString(sum("root.ln.wf01.wt01.temperature"))+ 
"," + resultSet.getString(mean("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray2[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray2.length, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void maxMinValeTimeTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "2,4.4,4.4,4,4",
+        "5,20.2,5.5,20,5",
+        "25,30.3,30.3,30,30",
+        "50,50.5,50.5,50,50",
+        "65,null,null,null,null",
+        "85,100.1,100.1,100,100",
+        "105,null,null,null,null",
+        "125,null,null,null,null",
+        "145,200.2,200.2,150,150",
+        "310,null,null,null,null"
+    };
+    String[] retArray2 = new String[]{
+        "2,4.4,3.3,4,3",
+        "5,20.2,5.5,20,5",
+        "25,30.3,30.3,30,30",
+        "50,50.5,50.5,50,50",
+        "65,null,null,null,null",
+        "85,100.1,100.1,100,100",
+        "105,null,null,null,null",
+        "125,null,null,null,null",
+        "145,200.2,200.2,150,150",
+        "310,null,null,null,null"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select max_value(temperature), 
min_value(temperature), max_time(temperature), min_time(temperature) from 
root.ln.wf01.wt01 where time > 3 "
+          + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(max_value("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"))+ "," + 
resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select max_value(temperature), 
min_value(temperature), max_time(temperature), min_time(temperature) from 
root.ln.wf01.wt01 where temperature > 3 "
+          + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(max_value("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(min_value("root.ln.wf01.wt01.temperature"))+ "," + 
resultSet.getString(max_time("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(min_time("root.ln.wf01.wt01.temperature"));
+        Assert.assertEquals(retArray2[cnt], ans);
+        cnt++;
+        //System.out.println(ans);
+      }
+      Assert.assertEquals(retArray2.length, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void firstLastTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "2,4.4,4.4",
+        "5,20.2,5.5",
+        "25,30.3,30.3",
+        "50,50.5,50.5",
+        "65,null,null",
+        "85,100.1,100.1",
+        "105,null,null",
+        "125,null,null",
+        "145,200.2,200.2",
+        "310,null,null"
+    };
+    String[] retArray2 = new String[]{
+        "2,4.4,3.3",
+        "5,20.2,5.5",
+        "25,30.3,30.3",
+        "50,50.5,50.5",
+        "65,null,null",
+        "85,100.1,100.1",
+        "105,null,null",
+        "125,null,null",
+        "145,200.2,200.2",
+        "310,null,null"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select last(temperature), 
first(temperature) from root.ln.wf01.wt01 where time > 3 "
+          + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(last("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(first("root.ln.wf01.wt01.temperature"));
+        System.out.println(ans);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray1.length, cnt);
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select first(temperature), 
last(temperature) from root.ln.wf01.wt01 where temperature > 3 "
+          + "GROUP BY (20ms, 5,[2,30], [35,37], [50, 160], [310, 314])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(last("root.ln.wf01.wt01.temperature"))
+            + "," + 
resultSet.getString(first("root.ln.wf01.wt01.temperature"));
+        System.out.println(ans);
+        Assert.assertEquals(retArray2[cnt], ans);
+        cnt++;
+      }
+      Assert.assertEquals(retArray2.length, cnt);
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  public void prepareData() throws SQLException {
+    Connection connection = null;
+    try {
+      connection = DriverManager
+          .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+              "root");
+      Statement statement = connection.createStatement();
+
+      for (String sql : dataSet1) {
+        statement.execute(sql);
+      }
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+}
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
index bd1e938..e1f5f13 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationTestIT.java
@@ -52,7 +52,7 @@ public class IoTDBAggregationTestIT {
       "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
       "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
       "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
-      "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, 
ENCODING=PLAIN",
+      "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN"
   };
 
   private static String[] dataSet2 = new String[]{
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java 
b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
index d8ebfa9..3b46f55 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
@@ -81,7 +81,7 @@ public class PhysicalPlanTest {
     String metadata = "create timeseries root.vehicle.d1.s1 with 
datatype=INT32,encoding=RLE";
     QueryProcessor processor = new QueryProcessor(new MemIntQpExecutor());
     MetadataPlan plan = (MetadataPlan) 
processor.parseSQLToPhysicalPlan(metadata);
-    assertEquals("seriesPath: root.vehicle.d1.s1\n" + "dataType: INT32\n" + 
"encoding: RLE\n"
+    assertEquals("seriesPath: root.vehicle.d1.s1\n" + "resultDataType: 
INT32\n" + "encoding: RLE\n"
         + "namespace type: ADD_PATH\n" + "args: ", plan.toString());
   }
 
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java 
b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index f2beb7d..469b3cc 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -112,12 +112,13 @@ public class MemIntQpExecutor extends 
QueryProcessExecutor {
   }
 
   @Override
-  public QueryDataSet groupBy(List<Pair<Path, String>> aggres, IExpression 
expression, long unit,
-      long origin,
-      List<Pair<Long, Long>> intervals, int fetchSize) {
+  public QueryDataSet groupBy(List<Path> paths, List<String> aggres, 
IExpression expression,
+      long unit, long origin, List<Pair<Long, Long>> intervals)
+      throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException, QueryFilterOptimizationException {
     return null;
   }
 
+
   @Override
   public boolean judgePathExists(Path path) {
     if (SQLConstant.isReservedPath(path)) {
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java 
b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
new file mode 100644
index 0000000..ee2488b
--- /dev/null
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/query/executor/GroupByEngineTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.executor;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class GroupByEngineTest {
+
+  @Test
+  public void test1() {
+    long jobId = 1000L;
+    long unit = 20;
+    long startTimePoint = 810;
+    List<Pair<Long, Long>> pairList = new ArrayList<>();
+    pairList.add(new Pair<>(805L,811L));
+    pairList.add(new Pair<>(825L,849L));
+
+    long[] startTimeArray = {805, 810, 830};
+    long[] endTimeArray = {810, 830, 850};
+    GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId, 
null, unit, startTimePoint, pairList);
+    int cnt = 0;
+    while (groupByEngine.hasNext()){
+      Pair pair = groupByEngine.nextTimePartition();
+      Assert.assertTrue(cnt < startTimeArray.length);
+      Assert.assertEquals(startTimeArray[cnt], pair.left);
+      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      cnt++;
+    }
+  }
+
+  @Test
+  public void test2() {
+    long jobId = 1000L;
+    long unit = 20;
+    long startTimePoint = 850;
+    List<Pair<Long, Long>> pairList = new ArrayList<>();
+    pairList.add(new Pair<>(805L,835L));
+    pairList.add(new Pair<>(850L,855L));
+    pairList.add(new Pair<>(858L,860L));
+    pairList.add(new Pair<>(1200L,1220L));
+
+    long[] startTimeArray = {805, 810, 830, 850, 1200, 1210};
+    long[] endTimeArray = {810, 830, 850, 870, 1210, 1230};
+    GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId, 
null, unit, startTimePoint, pairList);
+    int cnt = 0;
+    while (groupByEngine.hasNext()){
+      Pair pair = groupByEngine.nextTimePartition();
+      Assert.assertTrue(cnt < startTimeArray.length);
+      Assert.assertEquals(startTimeArray[cnt], pair.left);
+      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      cnt++;
+    }
+  }
+
+  @Test
+  public void test3() {
+    long jobId = 1000L;
+    long unit = 20;
+    long startTimePoint = 100;
+    List<Pair<Long, Long>> pairList = new ArrayList<>();
+    pairList.add(new Pair<>(805L,835L));
+    pairList.add(new Pair<>(850L,855L));
+    pairList.add(new Pair<>(858L,860L));
+    pairList.add(new Pair<>(1200L,1220L));
+
+    long[] startTimeArray = {805, 820, 850, 1200, 1210};
+    long[] endTimeArray = {820, 840, 860, 1210, 1230};
+    GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId, 
null, unit, startTimePoint, pairList);
+    int cnt = 0;
+    while (groupByEngine.hasNext()){
+      Pair pair = groupByEngine.nextTimePartition();
+      Assert.assertTrue(cnt < startTimeArray.length);
+      Assert.assertEquals(startTimeArray[cnt], pair.left);
+      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      cnt++;
+    }
+  }
+
+  @Test
+  public void test4() {
+    long jobId = 1000L;
+    long unit = 200;
+    long startTimePoint = 100;
+    List<Pair<Long, Long>> pairList = new ArrayList<>();
+    pairList.add(new Pair<>(805L,835L));
+    pairList.add(new Pair<>(850L,855L));
+    pairList.add(new Pair<>(858L,860L));
+    pairList.add(new Pair<>(1200L,1220L));
+
+    long[] startTimeArray = {805, 1200};
+    long[] endTimeArray = {900, 1300};
+    GroupByEngine groupByEngine = new GroupByWithValueFilterDataSet(jobId, 
null, unit, startTimePoint, pairList);
+    int cnt = 0;
+    while (groupByEngine.hasNext()){
+      Pair pair = groupByEngine.nextTimePartition();
+      Assert.assertTrue(cnt < startTimeArray.length);
+      Assert.assertEquals(startTimeArray[cnt], pair.left);
+      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      cnt++;
+    }
+  }
+
+
+}
\ No newline at end of file
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index 4c00641..c46da70 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -34,6 +34,10 @@ public abstract class QueryDataSet {
     this.dataTypes = dataTypes;
   }
 
+  public QueryDataSet(List<Path> paths) {
+    this.paths = paths;
+  }
+
   /**
    * This method is used for batch query.
    */
@@ -52,4 +56,7 @@ public abstract class QueryDataSet {
     return dataTypes;
   }
 
+  public void setDataTypes(List<TSDataType> dataTypes) {
+    this.dataTypes = dataTypes;
+  }
 }

Reply via email to