This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit cfa2b65f7cc9b6946d95aca21655437e14d20ddf
Author: suyue <[email protected]>
AuthorDate: Sun Mar 10 16:27:56 2019 +0800

    realize aggregate function interface
---
 .../db/query/aggregation/AggreFuncFactory.java     |  76 ++++++++++
 .../db/query/aggregation/AggregateFunction.java    | 130 +++++++++++++++++
 .../db/query/aggregation/AggregationConstant.java  |  40 ++++++
 .../db/query/aggregation/impl/CountAggrFunc.java   | 112 +++++++++++++++
 .../db/query/aggregation/impl/FirstAggrFunc.java   | 114 +++++++++++++++
 .../db/query/aggregation/impl/LastAggrFunc.java    | 135 ++++++++++++++++++
 .../db/query/aggregation/impl/MaxTimeAggrFunc.java | 127 +++++++++++++++++
 .../query/aggregation/impl/MaxValueAggrFunc.java   | 137 ++++++++++++++++++
 .../db/query/aggregation/impl/MeanAggrFunc.java    | 133 +++++++++++++++++
 .../db/query/aggregation/impl/MinTimeAggrFunc.java | 108 ++++++++++++++
 .../query/aggregation/impl/MinValueAggrFunc.java   | 138 ++++++++++++++++++
 .../db/query/aggregation/impl/SumAggrFunc.java     | 130 +++++++++++++++++
 .../iotdb/db/query/dataset/AggregateDataSet.java   | 138 ++++++++++++++++++
 .../db/query/executor/AggregateEngineExecutor.java | 158 +++++++++++++++++++++
 .../iotdb/db/query/executor/EngineQueryRouter.java |  35 +++++
 .../apache/iotdb/tsfile/read/common/BatchData.java |  58 ++++++++
 16 files changed, 1769 insertions(+)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
new file mode 100644
index 0000000..882993d
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggreFuncFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation;
+
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.impl.CountAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.FirstAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.MeanAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.MinTimeAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.SumAggrFunc;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+/**
+ * Easy factory pattern to build AggregateFunction.
+ */
+public class AggreFuncFactory {
+
+  /**
+   * construct AggregateFunction using factory pattern.
+   * @param aggrFuncName function name.
+   * @param dataType data type.
+   * @return
+   * @throws ProcessorException
+   */
+  public static AggregateFunction getAggrFuncByName(String aggrFuncName, 
TSDataType dataType)
+      throws ProcessorException {
+    if (aggrFuncName == null) {
+      throw new ProcessorException("AggregateFunction Name must not be null");
+    }
+
+    switch (aggrFuncName.toLowerCase()) {
+      case AggregationConstant.MIN_TIME:
+        return new MinTimeAggrFunc();
+      case AggregationConstant.MAX_TIME:
+        return new MaxTimeAggrFunc();
+      case AggregationConstant.MIN_VALUE:
+        return new MinValueAggrFunc(dataType);
+      case AggregationConstant.MAX_VALUE:
+        return new MaxValueAggrFunc(dataType);
+      case AggregationConstant.COUNT:
+        return new CountAggrFunc();
+      case AggregationConstant.MEAN:
+        return new MeanAggrFunc();
+      case AggregationConstant.FIRST:
+        return new FirstAggrFunc(dataType);
+      case AggregationConstant.SUM:
+        return new SumAggrFunc();
+      case AggregationConstant.LAST:
+        return new LastAggrFunc(dataType);
+      default:
+        throw new ProcessorException(
+            "aggregate does not support " + aggrFuncName + " function.");
+    }
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
new file mode 100644
index 0000000..34d2d75
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public abstract class AggregateFunction {
+
+  public String name;
+  public BatchData resultData;
+  public TSDataType dataType;
+  public boolean hasSetValue;
+
+  public AggregateFunction(String name, TSDataType dataType) {
+    this.name = name;
+    this.dataType = dataType;
+    this.hasSetValue = false;
+    resultData = new BatchData(dataType, true, true);
+  }
+
+  public abstract void init();
+
+  public abstract BatchData getResult();
+
+  /**
+   * <p>
+   * Calculate the aggregation using <code>PageHeader</code>.
+   * </p>
+   *
+   * @param pageHeader <code>PageHeader</code>
+   */
+  public abstract void calculateValueFromPageHeader(PageHeader pageHeader)
+      throws ProcessorException;
+
+  /**
+   * <p>
+   * Could not calculate using <method>calculateValueFromPageHeader</method> 
directly. Calculate the
+   * aggregation according to all decompressed data in this page.
+   * </p>
+   *
+   * @param dataInThisPage the data in the DataPage
+   * @param unsequenceReader unsequence data reader
+   * @throws IOException TsFile data read exception
+   * @throws ProcessorException wrong aggregation method parameter
+   */
+  public abstract void calculateValueFromPageData(BatchData dataInThisPage,
+      IPointReader unsequenceReader) throws IOException, ProcessorException;
+
+  public abstract void calculateValueFromUnsequenceReader(IPointReader 
unsequenceReader)
+      throws IOException, ProcessorException;
+
+
+  /**
+   * <p>
+   * This method is calculate the aggregation using the common timestamps of 
cross series filter.
+   * </p>
+   *
+   * @throws IOException TsFile data read error
+   * @throws ProcessorException wrong aggregation method parameter
+   */
+  public abstract boolean calcAggregationUsingTimestamps(EngineTimeGenerator 
timeGenerator,
+      EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp 
unsequenceReader)
+      throws IOException, ProcessorException;
+
+  /**
+   * <p>
+   * This method is calculate the group by function.
+   * </p>
+   */
+  public abstract void calcGroupByAggregation(long partitionStart, long 
partitionEnd,
+      long intervalStart, long intervalEnd,
+      BatchData data) throws ProcessorException;
+
+  /**
+   * Convert a value from string to its real data type and put into return 
data.
+   */
+  public void putValueFromStr(String valueStr) throws ProcessorException {
+    try {
+      switch (dataType) {
+        case INT32:
+          resultData.putInt(Integer.parseInt(valueStr));
+          break;
+        case INT64:
+          resultData.putLong(Long.parseLong(valueStr));
+          break;
+        case BOOLEAN:
+          resultData.putBoolean(Boolean.parseBoolean(valueStr));
+          break;
+        case TEXT:
+          resultData.putBinary(new Binary(valueStr));
+          break;
+        case DOUBLE:
+          resultData.putDouble(Double.parseDouble(valueStr));
+          break;
+        case FLOAT:
+          resultData.putFloat(Float.parseFloat(valueStr));
+          break;
+        default:
+          throw new ProcessorException("Unsupported type " + dataType);
+      }
+    } catch (Exception e) {
+      throw new ProcessorException(e.getMessage());
+    }
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationConstant.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationConstant.java
new file mode 100644
index 0000000..848d9c7
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationConstant.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation;
+
+import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
+
+public class AggregationConstant {
+
+  public static final String MIN_TIME = StatisticConstant.MIN_TIME;
+  public static final String MAX_TIME = StatisticConstant.MAX_TIME;
+
+  public static final String MAX_VALUE = StatisticConstant.MAX_VALUE;
+  public static final String MIN_VALUE = StatisticConstant.MIN_VALUE;
+
+  public static final String COUNT = StatisticConstant.COUNT;
+
+  public static final String FIRST = StatisticConstant.FIRST;
+  public static final String LAST = StatisticConstant.LAST;
+
+  public static final String MEAN = StatisticConstant.MEAN;
+  public static final String SUM = StatisticConstant.SUM;
+
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
new file mode 100644
index 0000000..b5cbc27
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.AggregationConstant;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class CountAggrFunc extends AggregateFunction {
+
+  public CountAggrFunc() {
+    super(AggregationConstant.COUNT, TSDataType.INT64);
+  }
+
+  @Override
+  public void init() {
+    if (resultData.length() == 0) {
+      resultData.putTime(0);
+      resultData.putLong(0);
+    }
+  }
+
+  @Override
+  public BatchData getResult() {
+    return resultData;
+  }
+
+  @Override
+  public void calculateValueFromPageHeader(PageHeader pageHeader) {
+    System.out.println("PageHeader>>>>>>>>>>>>" + pageHeader.getNumOfValues() 
+ " " + pageHeader
+        .getMinTimestamp()
+        + "," + pageHeader.getMaxTimestamp());
+    long preValue = resultData.getLong();
+    preValue += pageHeader.getNumOfValues();
+    resultData.setLong(0, preValue);
+
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      if (dataInThisPage.currentTime() == 
unsequenceReader.current().getTimestamp()) {
+        dataInThisPage.next();
+        unsequenceReader.next();
+      } else if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        dataInThisPage.next();
+      } else {
+        unsequenceReader.next();
+      }
+      long preValue = resultData.getLong();
+      preValue += 1;
+      resultData.setLong(0, preValue);
+    }
+
+    if (dataInThisPage.hasNext()) {
+      long preValue = resultData.getLong();
+      preValue += (resultData.length() - resultData.getCurIdx());
+      resultData.setLong(0, preValue);
+    }
+  }
+
+  @Override
+  public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    int cnt = 0;
+    while (unsequenceReader.hasNext()) {
+      unsequenceReader.next();
+      cnt++;
+    }
+    long preValue = resultData.getLong();
+    preValue += cnt;
+    resultData.setLong(0, preValue);
+  }
+
+  @Override
+  public boolean calcAggregationUsingTimestamps(EngineTimeGenerator 
timeGenerator,
+      EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp 
unsequenceReader)
+      throws IOException, ProcessorException {
+    return false;
+  }
+
+  @Override
+  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
+      long intervalEnd, BatchData data) throws ProcessorException {
+
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
new file mode 100644
index 0000000..96823ff
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstAggrFunc.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.AggregationConstant;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class FirstAggrFunc extends AggregateFunction {
+
+  public FirstAggrFunc(TSDataType dataType) {
+    super(AggregationConstant.FIRST, dataType);
+  }
+
+  @Override
+  public void init() {
+
+  }
+
+  @Override
+  public BatchData getResult() {
+    return resultData;
+  }
+
+  @Override
+  public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
+    if (resultData.length() != 0) {
+      return;
+    }
+
+    Object firstVal = pageHeader.getStatistics().getFirst();
+    if (firstVal == null) {
+      throw new ProcessorException("PageHeader contains no FIRST value");
+    }
+    resultData.setTime(0, 0);
+    resultData.putAnObject(firstVal);
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    if (resultData.length() != 0) {
+      return;
+    }
+    if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      if (dataInThisPage.currentTime() >= 
unsequenceReader.current().getTimestamp()) {
+        resultData.setTime(0, 0);
+        
resultData.putAnObject(unsequenceReader.current().getValue().getValue());
+        unsequenceReader.next();
+        return;
+      } else {
+        resultData.setTime(0, 0);
+        resultData.putAnObject(dataInThisPage.currentValue());
+        return;
+      }
+    }
+
+    if (dataInThisPage.hasNext()) {
+      resultData.setTime(0, 0);
+      resultData.putAnObject(dataInThisPage.currentValue());
+      return;
+    }
+  }
+
+  @Override
+  public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    if (resultData.length() != 0) {
+      return;
+    }
+    if (unsequenceReader.hasNext()) {
+      resultData.setTime(0, 0);
+      resultData.putAnObject(unsequenceReader.current().getValue().getValue());
+      return;
+    }
+  }
+
+  @Override
+  public boolean calcAggregationUsingTimestamps(EngineTimeGenerator 
timeGenerator,
+      EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp 
unsequenceReader)
+      throws IOException, ProcessorException {
+    return false;
+  }
+
+  @Override
+  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
+      long intervalEnd, BatchData data) throws ProcessorException {
+
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
new file mode 100644
index 0000000..def4c6a
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastAggrFunc.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.AggregationConstant;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class LastAggrFunc extends AggregateFunction {
+
+  public LastAggrFunc(TSDataType dataType) {
+    super(AggregationConstant.LAST, dataType);
+  }
+
+  @Override
+  public void init() {
+
+  }
+
+  @Override
+  public BatchData getResult() {
+    if (resultData.length() != 0) {
+      resultData.setTime(0, 0);
+    }
+    return resultData;
+  }
+
+  @Override
+  public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
+    Object lastVal = pageHeader.getStatistics().getLast();
+    if (resultData.length() == 0) {
+      resultData.putTime(pageHeader.getMaxTimestamp());
+      resultData.putAnObject(lastVal);
+    } else {
+      if (resultData.currentTime() < pageHeader.getMaxTimestamp()) {
+        resultData.setAnObject(0, (Comparable<?>) lastVal);
+      }
+    }
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    long time = -1;
+    Object lastVal = null;
+    int maxIndex = dataInThisPage.length() - 1;
+    if (maxIndex < 0) {
+      return;
+    }
+    time = dataInThisPage.getTimeByIndex(maxIndex);
+    lastVal = dataInThisPage.getValueByIndex(maxIndex);
+    while (unsequenceReader.hasNext()) {
+      if (unsequenceReader.current().getTimestamp() < time) {
+        unsequenceReader.next();
+      } else if (unsequenceReader.current().getTimestamp() == time) {
+        lastVal = unsequenceReader.current().getValue().getValue();
+        unsequenceReader.next();
+      } else {
+        break;
+      }
+    }
+
+    if (resultData.length() == 0) {
+      if (time != -1) {
+        resultData.setTime(0, time);
+        resultData.setAnObject(0, (Comparable<?>) lastVal);
+      }
+    } else {
+      //has set value
+      if (time != -1 && time > resultData.currentTime()) {
+        resultData.setTime(0, time);
+        resultData.setAnObject(0, (Comparable<?>) lastVal);
+      }
+    }
+  }
+
+  @Override
+  public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    TimeValuePair pair = null;
+    while (unsequenceReader.hasNext()) {
+      pair = unsequenceReader.next();
+    }
+    if (resultData.length() == 0) {
+      if (pair != null) {
+        resultData.setAnObject(0, (Comparable<?>) pair.getValue().getValue());
+        resultData.setTime(0, pair.getTimestamp());
+      }
+    } else {
+      if (pair != null && pair.getTimestamp() >= resultData.currentTime()) {
+        resultData.setAnObject(0, (Comparable<?>) pair.getValue().getValue());
+        resultData.setTime(0, pair.getTimestamp());
+      }
+    }
+
+  }
+
+  @Override
+  public boolean calcAggregationUsingTimestamps(EngineTimeGenerator 
timeGenerator,
+      EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp 
unsequenceReader)
+      throws IOException, ProcessorException {
+    return false;
+  }
+
+  @Override
+  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
+      long intervalEnd, BatchData data) throws ProcessorException {
+
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
new file mode 100644
index 0000000..8b4adaf
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrFunc.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.AggregationConstant;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class MaxTimeAggrFunc extends AggregateFunction {
+
+  public MaxTimeAggrFunc() {
+    super(AggregationConstant.MAX_TIME, TSDataType.INT64);
+  }
+
+  @Override
+  public void init() {
+
+  }
+
+  @Override
+  public BatchData getResult() {
+    return resultData;
+  }
+
+  @Override
+  public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
+    long maxTimestamp = pageHeader.getMaxTimestamp();
+
+    //has not set value
+    if (resultData.length() == 0) {
+      resultData.putTime(0);
+      resultData.putLong(maxTimestamp);
+      return;
+    }
+
+    if (resultData.getLong() < maxTimestamp) {
+      resultData.setLong(0, maxTimestamp);
+    }
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    long time = -1;
+    int maxIndex = dataInThisPage.length() - 1;
+    if (maxIndex < 0) {
+      return;
+    }
+    time = dataInThisPage.getTimeByIndex(maxIndex);
+    while (unsequenceReader.hasNext()) {
+      if (unsequenceReader.current().getTimestamp() <= time) {
+        unsequenceReader.next();
+      } else {
+        break;
+      }
+    }
+    if (resultData.length() == 0) {
+      if (time != -1) {
+        resultData.setTime(0, 0);
+        resultData.setAnObject(0, time);
+      }
+    } else {
+      //has set value
+      if (time != -1 && time > resultData.currentTime()) {
+        resultData.setAnObject(0, time);
+      }
+    }
+  }
+
+  @Override
+  public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    TimeValuePair pair = null;
+    while (unsequenceReader.hasNext()) {
+      pair = unsequenceReader.next();
+    }
+    if (resultData.length() == 0) {
+      if (pair != null) {
+        resultData.setTime(0, 0);
+        resultData.setAnObject(0, pair.getTimestamp());
+      }
+    } else {
+      //has set value
+      if (pair != null && pair.getTimestamp() > resultData.currentTime()) {
+        resultData.setAnObject(0, pair.getTimestamp());
+      }
+    }
+  }
+
+  @Override
+  public boolean calcAggregationUsingTimestamps(EngineTimeGenerator 
timeGenerator,
+      EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp 
unsequenceReader)
+      throws IOException, ProcessorException {
+    return false;
+  }
+
+  @Override
+  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
+      long intervalEnd, BatchData data) throws ProcessorException {
+
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
new file mode 100644
index 0000000..ac3d901
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrFunc.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.AggregationConstant;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class MaxValueAggrFunc extends AggregateFunction {
+
+  public MaxValueAggrFunc(TSDataType dataType) {
+    super(AggregationConstant.MAX_VALUE, dataType);
+  }
+
+  @Override
+  public void init() {
+
+  }
+
+  @Override
+  public BatchData getResult() {
+    return resultData;
+  }
+
+  @Override
+  public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
+    Comparable<Object> maxVal = (Comparable<Object>) 
pageHeader.getStatistics().getMax();
+    if (resultData.length() == 0) {
+      resultData.putTime(0);
+      resultData.setAnObject(0, maxVal);
+    } else {
+      if (maxVal.compareTo(resultData.currentValue()) > 0) {
+        resultData.setAnObject(0, maxVal);
+      }
+    }
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    Comparable<Object> maxVal = null;
+    while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        if (maxVal == null || maxVal.compareTo(dataInThisPage.currentValue()) 
< 0) {
+          maxVal = (Comparable<Object>) dataInThisPage.currentValue();
+        }
+        dataInThisPage.next();
+      } else if (dataInThisPage.currentTime() == 
unsequenceReader.current().getTimestamp()) {
+        if (maxVal == null
+            || 
maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) {
+          maxVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+        }
+        dataInThisPage.next();
+        unsequenceReader.next();
+      } else {
+        if (maxVal == null
+            || 
maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) {
+          maxVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+        }
+        unsequenceReader.next();
+      }
+    }
+
+    while (dataInThisPage.hasNext()) {
+      if (maxVal == null
+          || 
maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) {
+        maxVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+      }
+      dataInThisPage.next();
+    }
+    updateResult(maxVal);
+  }
+
+  @Override
+  public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    Comparable<Object> maxVal = null;
+    while (unsequenceReader.hasNext()) {
+      if (maxVal == null
+          || 
maxVal.compareTo(unsequenceReader.current().getValue().getValue()) < 0) {
+        maxVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+      }
+      unsequenceReader.next();
+    }
+    updateResult(maxVal);
+  }
+
+  private void updateResult(Comparable<Object> maxVal) {
+    if (resultData.length() == 0) {
+      if (maxVal != null) {
+        resultData.putTime(0);
+        resultData.setAnObject(0, maxVal);
+      }
+    } else {
+      if (maxVal != null && maxVal.compareTo(resultData.currentValue()) > 0) {
+        resultData.setAnObject(0, maxVal);
+      }
+    }
+  }
+
+  @Override
+  public boolean calcAggregationUsingTimestamps(EngineTimeGenerator 
timeGenerator,
+      EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp 
unsequenceReader)
+      throws IOException, ProcessorException {
+    return false;
+  }
+
+  @Override
+  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
+      long intervalEnd, BatchData data) throws ProcessorException {
+
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
new file mode 100644
index 0000000..9e56fdc
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MeanAggrFunc.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.AggregationConstant;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class MeanAggrFunc extends AggregateFunction {
+  private double sum = 0.0;
+  private int cnt = 0;
+
+  public MeanAggrFunc() {
+    super(AggregationConstant.MEAN, TSDataType.DOUBLE);
+  }
+
+  @Override
+  public void init() {
+  }
+
+  @Override
+  public BatchData getResult() {
+    if (cnt > 0) {
+      resultData.putTime(0);
+      resultData.setDouble(0, sum / cnt);
+    }
+    return resultData;
+  }
+
+  @Override
+  public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
+    sum += pageHeader.getStatistics().getSum();
+    cnt += pageHeader.getNumOfValues();
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    TSDataType type = dataInThisPage.getDataType();
+    while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      Object sumVal = null;
+      if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        sumVal = dataInThisPage.currentValue();
+        dataInThisPage.next();
+      } else if (dataInThisPage.currentTime() == 
unsequenceReader.current().getTimestamp()) {
+        sumVal = unsequenceReader.current().getValue().getValue();
+        dataInThisPage.next();
+        unsequenceReader.next();
+      } else {
+        sumVal = unsequenceReader.current().getValue().getValue();
+        unsequenceReader.next();
+      }
+      updateMean(type, sumVal);
+    }
+
+    while (dataInThisPage.hasNext()) {
+      updateMean(type, dataInThisPage.currentValue());
+      dataInThisPage.next();
+    }
+  }
+
+  private void updateMean(TSDataType type, Object sumVal) throws 
ProcessorException {
+    switch (type) {
+      case INT32:
+        sum += (int) sumVal;
+        break;
+      case INT64:
+        sum += (long) sumVal;
+        break;
+      case FLOAT:
+        sum += (float) sumVal;
+        break;
+      case DOUBLE:
+        sum += (double) sumVal;
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new ProcessorException("Unsupported data type in aggregation 
MEAN : " + type);
+    }
+    cnt++;
+  }
+
+  @Override
+  public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    TSDataType type = null;
+    if (unsequenceReader.hasNext()) {
+      type = unsequenceReader.current().getValue().getDataType();
+    }
+    while (unsequenceReader.hasNext()) {
+      TimeValuePair pair = unsequenceReader.next();
+      updateMean(type, pair.getValue().getValue());
+    }
+  }
+
+  @Override
+  public boolean calcAggregationUsingTimestamps(EngineTimeGenerator 
timeGenerator,
+      EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp 
unsequenceReader)
+      throws IOException, ProcessorException {
+    return false;
+  }
+
+  @Override
+  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
+      long intervalEnd, BatchData data) throws ProcessorException {
+
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
new file mode 100644
index 0000000..4dca740
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrFunc.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.AggregationConstant;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class MinTimeAggrFunc extends AggregateFunction {
+
+  public MinTimeAggrFunc() {
+    super(AggregationConstant.MIN_TIME, TSDataType.INT64);
+  }
+
+  @Override
+  public void init() {
+
+  }
+
+  @Override
+  public BatchData getResult() {
+    return resultData;
+  }
+
+  @Override
+  public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
+    if (resultData.length() > 0) {
+      return;
+    }
+    long time = pageHeader.getMinTimestamp();
+    resultData.putTime(0);
+    resultData.putLong(time);
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    if (resultData.length() > 0) {
+      return;
+    }
+
+    if (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        resultData.putTime(0);
+        resultData.putLong(dataInThisPage.currentTime());
+      } else {
+        resultData.putTime(0);
+        resultData.putLong(unsequenceReader.current().getTimestamp());
+      }
+      return;
+    }
+
+    if (dataInThisPage.hasNext()) {
+      resultData.putTime(0);
+      resultData.putLong(dataInThisPage.currentTime());
+    }
+  }
+
+  @Override
+  public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    if (resultData.length() > 0) {
+      return;
+    }
+    if (unsequenceReader.hasNext()) {
+      resultData.putTime(0);
+      resultData.putLong(unsequenceReader.current().getTimestamp());
+    }
+  }
+
+  @Override
+  public boolean calcAggregationUsingTimestamps(EngineTimeGenerator 
timeGenerator,
+      EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp 
unsequenceReader)
+      throws IOException, ProcessorException {
+    return false;
+  }
+
+  @Override
+  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
+      long intervalEnd, BatchData data) throws ProcessorException {
+
+  }
+
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
new file mode 100644
index 0000000..e60ab45
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrFunc.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.AggregationConstant;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class MinValueAggrFunc extends AggregateFunction {
+
+  public MinValueAggrFunc(TSDataType dataType) {
+    super(AggregationConstant.MIN_VALUE, dataType);
+  }
+
+  @Override
+  public void init() {
+
+  }
+
+  @Override
+  public BatchData getResult() {
+    return resultData;
+  }
+
+  @Override
+  public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
+    Comparable<Object> minVal = (Comparable<Object>) 
pageHeader.getStatistics().getMin();
+    if (resultData.length() == 0) {
+      resultData.putTime(0);
+      resultData.setAnObject(0, minVal);
+    } else {
+      if (minVal.compareTo(resultData.currentValue()) < 0) {
+        resultData.setAnObject(0, minVal);
+      }
+    }
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    Comparable<Object> minVal = null;
+    while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        if (minVal == null || minVal.compareTo(dataInThisPage.currentValue()) 
> 0) {
+          minVal = (Comparable<Object>) dataInThisPage.currentValue();
+        }
+        dataInThisPage.next();
+      } else if (dataInThisPage.currentTime() == 
unsequenceReader.current().getTimestamp()) {
+        if (minVal == null
+            || 
minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) {
+          minVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+        }
+        dataInThisPage.next();
+        unsequenceReader.next();
+      } else {
+        if (minVal == null
+            || 
minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) {
+          minVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+        }
+        unsequenceReader.next();
+      }
+    }
+
+    while (dataInThisPage.hasNext()) {
+      if (minVal == null
+          || 
minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) {
+        minVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+      }
+      dataInThisPage.next();
+    }
+    updateResult(minVal);
+  }
+
+  @Override
+  public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    Comparable<Object> minVal = null;
+    while (unsequenceReader.hasNext()) {
+      if (minVal == null
+          || 
minVal.compareTo(unsequenceReader.current().getValue().getValue()) > 0) {
+        minVal = (Comparable<Object>) 
unsequenceReader.current().getValue().getValue();
+      }
+      unsequenceReader.next();
+    }
+    updateResult(minVal);
+  }
+
+  private void updateResult(Comparable<Object> minVal) {
+    if (resultData.length() == 0) {
+      if (minVal != null) {
+        resultData.putTime(0);
+        resultData.setAnObject(0, minVal);
+      }
+    } else {
+      if (minVal != null && minVal.compareTo(resultData.currentValue()) < 0) {
+        resultData.setAnObject(0, minVal);
+      }
+    }
+  }
+
+  @Override
+  public boolean calcAggregationUsingTimestamps(EngineTimeGenerator 
timeGenerator,
+      EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp 
unsequenceReader)
+      throws IOException, ProcessorException {
+    return false;
+  }
+
+  @Override
+  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
+      long intervalEnd, BatchData data) throws ProcessorException {
+
+  }
+
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
new file mode 100644
index 0000000..642d2a7
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrFunc.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import java.io.IOException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
+import org.apache.iotdb.db.query.aggregation.AggregationConstant;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
+import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class SumAggrFunc extends AggregateFunction {
+
+  private double sum = 0.0;
+
+  public SumAggrFunc() {
+    super(AggregationConstant.SUM, TSDataType.DOUBLE);
+  }
+
+  @Override
+  public void init() {
+
+  }
+
+  @Override
+  public BatchData getResult() {
+    resultData.setDouble(0, sum);
+    return resultData;
+  }
+
+  @Override
+  public void calculateValueFromPageHeader(PageHeader pageHeader) throws 
ProcessorException {
+    sum += pageHeader.getStatistics().getSum();
+  }
+
+  @Override
+  public void calculateValueFromPageData(BatchData dataInThisPage, 
IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    TSDataType type = dataInThisPage.getDataType();
+    while (dataInThisPage.hasNext() && unsequenceReader.hasNext()) {
+      Object sumVal = null;
+      if (dataInThisPage.currentTime() < 
unsequenceReader.current().getTimestamp()) {
+        sumVal = dataInThisPage.currentValue();
+        dataInThisPage.next();
+      } else if (dataInThisPage.currentTime() == 
unsequenceReader.current().getTimestamp()) {
+        sumVal = unsequenceReader.current().getValue().getValue();
+        dataInThisPage.next();
+        unsequenceReader.next();
+      } else {
+        sumVal = unsequenceReader.current().getValue().getValue();
+        unsequenceReader.next();
+      }
+      updateSum(type, sumVal);
+    }
+
+    while (dataInThisPage.hasNext()) {
+      updateSum(type, dataInThisPage.currentValue());
+      dataInThisPage.next();
+    }
+  }
+
+  private void updateSum(TSDataType type, Object sumVal) throws 
ProcessorException {
+    switch (type) {
+      case INT32:
+        sum += (int) sumVal;
+        break;
+      case INT64:
+        sum += (long) sumVal;
+        break;
+      case FLOAT:
+        sum += (float) sumVal;
+        break;
+      case DOUBLE:
+        sum += (double) sumVal;
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new ProcessorException("Unsupported data type in aggregation 
MEAN : " + type);
+    }
+  }
+
+  @Override
+  public void calculateValueFromUnsequenceReader(IPointReader unsequenceReader)
+      throws IOException, ProcessorException {
+    TSDataType type = null;
+    if (unsequenceReader.hasNext()) {
+      type = unsequenceReader.current().getValue().getDataType();
+    }
+    while (unsequenceReader.hasNext()) {
+      TimeValuePair pair = unsequenceReader.next();
+      updateSum(type, pair.getValue().getValue());
+    }
+  }
+
+  @Override
+  public boolean calcAggregationUsingTimestamps(EngineTimeGenerator 
timeGenerator,
+      EngineReaderByTimeStamp sequenceReader, EngineReaderByTimeStamp 
unsequenceReader)
+      throws IOException, ProcessorException {
+    return false;
+  }
+
+  @Override
+  public void calcGroupByAggregation(long partitionStart, long partitionEnd, 
long intervalStart,
+      long intervalEnd, BatchData data) throws ProcessorException {
+
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggregateDataSet.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggregateDataSet.java
new file mode 100644
index 0000000..00fe608
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/dataset/AggregateDataSet.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+public class AggregateDataSet extends QueryDataSet {
+  private List<BatchData> readers;
+
+  private PriorityQueue<Long> timeHeap;
+
+  private List<String> aggres;
+
+  /**
+   * constructor of EngineDataSetWithoutTimeGenerator.
+   *
+   * @param paths paths in List structure
+   * @param aggres aggregate function name
+   * @param dataTypes time series data type
+   * @param readers readers in List(IReader) structure
+   * @throws IOException IOException
+   */
+  public AggregateDataSet(List<Path> paths, List<String> aggres, 
List<TSDataType> dataTypes,
+      List<BatchData> readers)
+      throws IOException {
+    super(paths, dataTypes);
+    this.readers = readers;
+    initHeap();
+  }
+
+  private void initHeap(){
+    timeHeap = new PriorityQueue<>();
+
+    for (int i = 0; i < readers.size(); i++) {
+      BatchData reader = readers.get(i);
+      if (reader.hasNext()) {
+        timeHeap.add(reader.currentTime());
+      }
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return !timeHeap.isEmpty();
+  }
+
+  @Override
+  public RowRecord next() throws IOException {
+    long minTime = timeHeapGet();
+
+    RowRecord record = new RowRecord(minTime);
+
+    for (int i = 0; i < readers.size(); i++) {
+      BatchData reader = readers.get(i);
+      if (!reader.hasNext()) {
+        record.addField(new Field(null));
+      } else {
+        if (reader.currentTime() == minTime) {
+          record.addField(getField(reader, dataTypes.get(i)));
+          reader.next();
+          if (reader.hasNext()) {
+            timeHeap.add(reader.currentTime());
+          }
+        } else {
+          record.addField(new Field(null));
+        }
+      }
+    }
+
+    return record;
+  }
+
+  private Field getField(BatchData batchData, TSDataType dataType) {
+    Field field = new Field(dataType);
+    switch (dataType) {
+      case INT32:
+        field.setIntV(batchData.getInt());
+        break;
+      case INT64:
+        field.setLongV(batchData.getLong());
+        break;
+      case FLOAT:
+        field.setFloatV(batchData.getFloat());
+        break;
+      case DOUBLE:
+        field.setDoubleV(batchData.getDouble());
+        break;
+      case BOOLEAN:
+        field.setBoolV(batchData.getBoolean());
+        break;
+      case TEXT:
+        field.setBinaryV(batchData.getBinary());
+        break;
+      default:
+        throw new UnSupportedDataTypeException("UnSupported: " + dataType);
+    }
+    return field;
+  }
+
+  private Long timeHeapGet() {
+    Long t = timeHeap.peek();
+    while (!timeHeap.isEmpty()){
+      if(timeHeap.peek() == t){
+        timeHeap.poll();
+      }
+      else {
+        break;
+      }
+    }
+    return t;
+  }
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
new file mode 100644
index 0000000..4cb269f
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.aggregation.AggreFuncFactory;
+import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
+import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryDataSourceManager;
+import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.dataset.AggregateDataSet;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.IAggregateReader;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+public class AggregateEngineExecutor {
+  private long jobId;
+  private List<Path> selectedSeries;
+  private List<String> aggres;
+  private IExpression expression;
+
+  public AggregateEngineExecutor(long jobId, List<Path> selectedSeries, 
List<String> aggres, IExpression expression) {
+    this.jobId = jobId;
+    this.selectedSeries = selectedSeries;
+    this.aggres = aggres;
+    this.expression = expression;
+  }
+
+  public AggregateDataSet executeWithOutTimeGenerator(QueryContext context)
+      throws FileNodeManagerException, IOException, PathErrorException, 
ProcessorException {
+    Filter timeFilter = null;
+    if(timeFilter!=null){
+      timeFilter = ((GlobalTimeExpression)expression).getFilter();
+    }
+    QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
+
+    List<IAggregateReader> readersOfSequenceData = new ArrayList<>();
+    List<IPointReader> readersOfUnSequenceData = new ArrayList<>();
+    List<AggregateFunction> aggregateFunctions = new ArrayList<>();
+    for(int i = 0; i < selectedSeries.size(); i++){
+      //construct AggregateFunction
+      TSDataType tsDataType = 
MManager.getInstance().getSeriesType(selectedSeries.get(i).getFullPath());
+      AggregateFunction function = 
AggreFuncFactory.getAggrFuncByName(aggres.get(i), tsDataType);
+      function.init();
+      aggregateFunctions.add(function);
+
+      QueryDataSource queryDataSource = 
QueryDataSourceManager.getQueryDataSource(jobId, selectedSeries.get(i), 
context);
+      // sequence reader for sealed tsfile, unsealed tsfile, memory
+      SequenceDataReader sequenceReader = new 
SequenceDataReader(queryDataSource.getSeqDataSource(), timeFilter, context);
+      // unseq reader for all chunk groups in unSeqFile, memory
+      PriorityMergeReader unSeqMergeReader = 
SeriesReaderFactory.getInstance().createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(),
 timeFilter);
+      readersOfSequenceData.add(sequenceReader);
+      readersOfUnSequenceData.add(unSeqMergeReader);
+    }
+
+    List<BatchData> batchDatas = new ArrayList<BatchData>();
+    //TODO use multi-thread
+    for(int i = 0; i < selectedSeries.size(); i++){
+      BatchData batchData = 
aggregateWithOutTimeGenerator(aggregateFunctions.get(i), 
readersOfSequenceData.get(i), readersOfUnSequenceData.get(i));
+      batchDatas.add(batchData);
+    }
+    return constructDataSet(batchDatas);
+  }
+
+  private BatchData aggregateWithOutTimeGenerator(AggregateFunction function, 
IAggregateReader sequenceReader, IPointReader unSequenceReader)
+      throws IOException, ProcessorException {
+    if (function instanceof MaxTimeAggrFunc || function instanceof 
LastAggrFunc){
+      //TODO Optimization
+      return function.getResult();
+    }
+
+    while (sequenceReader.hasNext()){
+      PageHeader pageHeader = sequenceReader.nextPageHeader();
+      //judge if overlap with unsequence data
+      if(canUseHeader(pageHeader, unSequenceReader)){
+        //cal by pageHeader
+        function.calculateValueFromPageHeader(pageHeader);
+        sequenceReader.skipPageData();
+      }
+      else {
+        //cal by pageData
+        function.calculateValueFromPageData(sequenceReader.nextBatch(), 
unSequenceReader);
+      }
+    }
+
+    //cal with unsequence data
+    if(unSequenceReader.hasNext()){
+      function.calculateValueFromUnsequenceReader(unSequenceReader);
+    }
+    return function.getResult();
+  }
+
+  private boolean canUseHeader(PageHeader pageHeader, IPointReader 
unSequenceReader)
+      throws IOException {
+    //if page data is memory data.
+    if(pageHeader == null){
+      return false;
+    }
+
+    long minTime = pageHeader.getMinTimestamp();
+    long maxTime = pageHeader.getMaxTimestamp();
+    while(unSequenceReader.hasNext() && 
unSequenceReader.current().getTimestamp() <= maxTime){
+      if(minTime <= unSequenceReader.current().getTimestamp()){
+        return false;
+      }
+      unSequenceReader.next();
+    }
+    return true;
+  }
+
+  public AggregateDataSet executeWithTimeGenerator(QueryContext context){
+    return null;
+  }
+
+  private AggregateDataSet constructDataSet(List<BatchData> batchDataList) 
throws IOException {
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for(BatchData batchData : batchDataList){
+      dataTypes.add(batchData.getDataType());
+    }
+    return new AggregateDataSet(selectedSeries,aggres,dataTypes, 
batchDataList);
+  }
+
+
+}
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index c9327f4..b5a996f 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -16,18 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.query.executor;
 
 import static 
org.apache.iotdb.tsfile.read.expression.ExpressionType.GLOBAL_TIME;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.dataset.AggregateDataSet;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.QueryExpression;
 import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
@@ -88,6 +93,36 @@ public class EngineQueryRouter {
     }
   }
 
+  /**
+   * execute aggregation query.
+   */
+  public AggregateDataSet aggregate(List<Path> selectedSeries, List<String> 
aggres,
+      IExpression expression)
+      throws QueryFilterOptimizationException, FileNodeManagerException, 
IOException, PathErrorException, ProcessorException {
+
+    long nextJobId = getNextJobId();
+    QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+    
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+
+    QueryContext context = new QueryContext();
+
+    if (expression != null) {
+      IExpression optimizedExpression = ExpressionOptimizer.getInstance()
+          .optimize(expression, selectedSeries);
+      AggregateEngineExecutor engineExecutor = new 
AggregateEngineExecutor(nextJobId,
+          selectedSeries, aggres, optimizedExpression);
+      if (optimizedExpression.getType() == GLOBAL_TIME) {
+        return engineExecutor.executeWithOutTimeGenerator(context);
+      } else {
+        return engineExecutor.executeWithTimeGenerator(context);
+      }
+    } else {
+      AggregateEngineExecutor engineExecutor = new 
AggregateEngineExecutor(nextJobId,
+          selectedSeries, aggres, expression);
+      return engineExecutor.executeWithOutTimeGenerator(context);
+    }
+  }
+
   private synchronized long getNextJobId() {
     return jobIdGenerator.incrementAndGet();
   }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index ffb37e2..e853440 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -527,4 +527,62 @@ public class BatchData {
   public int length() {
     return this.timeLength;
   }
+
+  public int getCurIdx() {
+    return curIdx;
+  }
+
+  public long getTimeByIndex(int idx){
+    rangeCheckForTime(idx);
+    return this.timeRet.get(idx / timeCapacity)[idx % timeCapacity];
+  }
+
+  public long getLongByIndex(int idx){
+    rangeCheck(idx);
+    return this.longRet.get(idx / timeCapacity)[idx % timeCapacity];
+  }
+
+  public double getDoubleByIndex(int idx) {
+    rangeCheck(idx);
+    return this.doubleRet.get(idx / timeCapacity)[idx % timeCapacity];
+  }
+
+  public int getIntByIndex(int idx) {
+    rangeCheck(idx);
+    return this.intRet.get(idx / timeCapacity)[idx % timeCapacity];
+  }
+
+  public float getFloatByIndex(int idx) {
+    rangeCheck(idx);
+    return this.floatRet.get(idx / timeCapacity)[idx % timeCapacity];
+  }
+
+  public Binary getBinaryByIndex(int idx) {
+    rangeCheck(idx);
+    return binaryRet.get(idx / timeCapacity)[idx % timeCapacity];
+  }
+
+  public boolean getBooleanByIndex(int idx) {
+    rangeCheck(idx);
+    return booleanRet.get(idx / timeCapacity)[idx % timeCapacity];
+  }
+
+  public Object getValueByIndex(int idx) {
+    switch (dataType) {
+      case INT32:
+        return getIntByIndex(idx);
+      case INT64:
+        return getLongByIndex(idx);
+      case FLOAT:
+        return getFloatByIndex(idx);
+      case DOUBLE:
+        return getDoubleByIndex(idx);
+      case BOOLEAN:
+        return getBooleanByIndex(idx);
+      case TEXT:
+        return getBinaryByIndex(idx);
+      default:
+        return null;
+    }
+  }
 }

Reply via email to