This is an automated email from the ASF dual-hosted git repository.

suyue pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/aggregate by this push:
     new f056180  fill query
f056180 is described below

commit f0561807e9015702432ae5ec0d778666ab8048cd
Author: suyue <[email protected]>
AuthorDate: Fri Mar 22 19:40:20 2019 +0800

    fill query
---
 .../iotdb/db/qp/executor/OverflowQPExecutor.java   |  27 ++-
 .../iotdb/db/qp/executor/QueryProcessExecutor.java |  12 +
 .../db/query/aggregation/impl/CountAggrFunc.java   |   6 +-
 .../iotdb/db/query/executor/EngineQueryRouter.java |  33 ++-
 .../db/query/executor/FillEngineExecutor.java      |  92 ++++++++
 .../java/org/apache/iotdb/db/query/fill/IFill.java |  82 ++++++-
 .../org/apache/iotdb/db/query/fill/LinearFill.java |  90 +++++++-
 .../apache/iotdb/db/query/fill/PreviousFill.java   |  34 ++-
 .../iotdb/db/integration/IOTDBFillTestIT.java      | 244 +++++++++++++++++++++
 .../iotdb/db/integration/IOTDBGroupByTestIT.java   |   7 -
 .../apache/iotdb/db/qp/utils/MemIntQpExecutor.java |  14 +-
 11 files changed, 590 insertions(+), 51 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 6eb69bb..9e7a5ae 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
 import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
 import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
+import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.db.utils.AuthUtils;
 import org.apache.iotdb.db.utils.LoadDataUtils;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -179,20 +180,22 @@ public class OverflowQPExecutor extends 
QueryProcessExecutor {
 
   @Override
   public QueryDataSet aggregate(List<Path> paths, List<String> aggres, 
IExpression expression)
-          throws ProcessorException, FileNodeManagerException, 
QueryFilterOptimizationException, PathErrorException, IOException {
+      throws ProcessorException, FileNodeManagerException, 
QueryFilterOptimizationException,
+      PathErrorException, IOException {
     return new EngineQueryRouter().aggregate(paths, aggres, expression);
   }
 
-  // @Override
-  // public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillTypes) throws
-  // ProcessorException, IOException, PathErrorException {
-  // return queryEngine.fill(fillPaths, queryTime, fillTypes);
-  // }
+  @Override
+  public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillTypes)
+      throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException {
+    return new EngineQueryRouter().fill(fillPaths, queryTime, fillTypes);
+  }
 
   @Override
-  public QueryDataSet groupBy(List<Path> paths, List<String> aggres, 
IExpression expression, long unit,
-      long origin, List<Pair<Long, Long>> intervals)
-      throws ProcessorException, FileNodeManagerException, 
QueryFilterOptimizationException, PathErrorException, IOException {
+  public QueryDataSet groupBy(List<Path> paths, List<String> aggres, 
IExpression expression,
+      long unit, long origin, List<Pair<Long, Long>> intervals)
+      throws ProcessorException, FileNodeManagerException, 
QueryFilterOptimizationException,
+      PathErrorException, IOException {
     return new EngineQueryRouter().groupBy(paths, aggres, expression, unit, 
origin, intervals);
   }
 
@@ -504,7 +507,8 @@ public class OverflowQPExecutor extends 
QueryProcessExecutor {
           // optimize the speed of adding timeseries
           String fileNodePath = mManager.getFileNameByPath(path.getFullPath());
           // the two map is stored in the storage group node
-          Map<String, MeasurementSchema> schemaMap = 
mManager.getSchemaMapForOneFileNode(fileNodePath);
+          Map<String, MeasurementSchema> schemaMap = mManager
+              .getSchemaMapForOneFileNode(fileNodePath);
           Map<String, Integer> numSchemaMap = 
mManager.getNumSchemaMapForOneFileNode(fileNodePath);
           String lastNode = path.getMeasurement();
           boolean isNewMeasurement = true;
@@ -629,8 +633,7 @@ public class OverflowQPExecutor extends 
QueryProcessExecutor {
   /**
    * Delete all data of time series in pathList.
    *
-   * @param pathList
-   *            deleted paths
+   * @param pathList deleted paths
    */
   private void deleteDataOfTimeSeries(List<String> pathList) throws 
ProcessorException {
     for (String p : pathList) {
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 7d08808..ea2def9 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -29,9 +30,11 @@ import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.executor.EngineQueryRouter;
+import org.apache.iotdb.db.query.fill.IFill;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -66,6 +69,11 @@ public abstract class QueryProcessExecutor {
       return aggregate(plan.getPaths(), plan.getAggregations(),
           ((AggregationPlan) plan).getExpression());
     }
+
+    if (plan instanceof FillQueryPlan) {
+      FillQueryPlan fillQueryPlan = (FillQueryPlan) plan;
+      return fill(plan.getPaths(), fillQueryPlan.getQueryTime(), 
fillQueryPlan.getFillType());
+    }
     return queryRouter.query(queryExpression);
   }
 
@@ -97,6 +105,10 @@ public abstract class QueryProcessExecutor {
       throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException,
       QueryFilterOptimizationException;
 
+  public abstract QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType,
+      IFill> fillTypes)
+      throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException;
+
   /**
    * executeWithGlobalTimeFilter update command and return whether the 
operator is successful.
    *
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index 1964875..301ce9a 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -56,8 +56,10 @@ public class CountAggrFunc extends AggregateFunction {
 
   @Override
   public void calculateValueFromPageHeader(PageHeader pageHeader) {
-    LOGGER.debug("PageHeader>>>>>>>>>>>>num of rows:{}, minTimeStamp:{}, 
maxTimeStamp{}",
-        pageHeader.getNumOfValues(), pageHeader.getMinTimestamp(), 
pageHeader.getMaxTimestamp());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("PageHeader>>>>>>>>>>>>num of rows:{}, minTimeStamp:{}, 
maxTimeStamp{}",
+          pageHeader.getNumOfValues(), pageHeader.getMinTimestamp(), 
pageHeader.getMaxTimestamp());
+    }
     long preValue = resultData.getLongRet();
     preValue += pageHeader.getNumOfValues();
     resultData.setLongRet(preValue);
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index d7f6cf0..176cac0 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -31,7 +32,9 @@ import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.fill.IFill;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -129,19 +132,14 @@ public class EngineQueryRouter {
 
   /**
    * execute groupBy query.
+   *
    * @param selectedSeries select path list
    * @param aggres aggregation name list
    * @param expression filter expression
    * @param unit time granularity for interval partitioning, unit is ms.
-   * @param origin the datum time point for interval division is divided into 
a time interval
-   *        for each TimeUnit time from this point forward and backward.
+   * @param origin the datum time point for interval division is divided into 
a time interval for
+   *        each TimeUnit time from this point forward and backward.
    * @param intervals time intervals, closed interval.
-   * @return
-   * @throws ProcessorException
-   * @throws QueryFilterOptimizationException
-   * @throws FileNodeManagerException
-   * @throws PathErrorException
-   * @throws IOException
    */
   public QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
       IExpression expression, long unit, long origin, List<Pair<Long, Long>> 
intervals)
@@ -204,6 +202,25 @@ public class EngineQueryRouter {
   }
 
   /**
+   * execute fill query.
+   * @param fillPaths select path list
+   * @param queryTime timestamp
+   * @param fillType type IFill map
+   * @return
+   */
+  public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillType)
+      throws FileNodeManagerException, PathErrorException, IOException, 
ProcessorException {
+    long nextJobId = getNextJobId();
+    QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+    
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+
+    QueryContext context = new QueryContext();
+    FillEngineExecutor fillEngineExecutor = new FillEngineExecutor(nextJobId, 
fillPaths, queryTime,
+        fillType);
+    return fillEngineExecutor.execute(context);
+  }
+
+  /**
    * sort intervals by start time and merge overlapping intervals.
    *
    * @param intervals time interval
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
new file mode 100644
index 0000000..4907aad
--- /dev/null
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryDataSourceManager;
+import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.db.query.fill.PreviousFill;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+public class FillEngineExecutor {
+
+  private long jobId;
+  private List<Path> selectedSeries;
+  private long queryTime;
+  private Map<TSDataType, IFill> typeIFillMap;
+
+  public FillEngineExecutor(long jobId, List<Path> selectedSeries, long 
queryTime,
+      Map<TSDataType, IFill> typeIFillMap) {
+    this.jobId = jobId;
+    this.selectedSeries = selectedSeries;
+    this.queryTime = queryTime;
+    this.typeIFillMap = typeIFillMap;
+  }
+
+  /**
+   * execute fill.
+   * @param context query context
+   */
+  public QueryDataSet execute(QueryContext context)
+      throws FileNodeManagerException, PathErrorException, IOException, 
ProcessorException {
+    QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
+
+    List<IFill> fillList = new ArrayList<>();
+    List<TSDataType> dataTypeList = new ArrayList<>();
+    for (Path path : selectedSeries) {
+      QueryDataSource queryDataSource = QueryDataSourceManager
+          .getQueryDataSource(jobId, path, context);
+      TSDataType dataType = 
MManager.getInstance().getSeriesType(path.getFullPath());
+      dataTypeList.add(dataType);
+      IFill fill = null;
+      if (!typeIFillMap.containsKey(dataType)) {
+        fill = new PreviousFill(dataType, queryTime, 0);
+      } else {
+        fill = typeIFillMap.get(dataType).copy(path);
+      }
+      fill.setDataType(dataType);
+      fill.setQueryTime(queryTime);
+      fill.constructReaders(queryDataSource, context);
+      fillList.add(fill);
+    }
+
+    List<IPointReader> readers = new ArrayList<>();
+    for (IFill fill : fillList) {
+      readers.add(fill.getFillResult());
+    }
+
+    return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypeList, 
readers);
+  }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index 7ddce7a..be77c94 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -18,16 +18,27 @@
  */
 package org.apache.iotdb.db.query.fill;
 
-import org.apache.iotdb.db.exception.ProcessorException;
+import java.io.IOException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.AllDataReader;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 public abstract class IFill {
 
   long queryTime;
   TSDataType dataType;
 
+  IPointReader allDataReader;
+
   public IFill(TSDataType dataType, long queryTime) {
     this.dataType = dataType;
     this.queryTime = queryTime;
@@ -38,7 +49,28 @@ public abstract class IFill {
 
   public abstract IFill copy(Path path);
 
-  public abstract BatchData getFillResult() throws ProcessorException;
+  public abstract void constructReaders(QueryDataSource queryDataSource, 
QueryContext context)
+      throws IOException;
+
+  void constructReaders(QueryDataSource queryDataSource, QueryContext context, 
long beforeRange)
+      throws IOException {
+    Filter timeFilter = constructFilter(beforeRange);
+    // sequence reader for sealed tsfile, unsealed tsfile, memory
+    SequenceDataReader sequenceReader = new 
SequenceDataReader(queryDataSource.getSeqDataSource(),
+        timeFilter, context, false);
+
+    // unseq reader for all chunk groups in unSeqFile, memory
+    PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+        .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), 
timeFilter);
+
+    if (sequenceReader == null) {
+      allDataReader = unSeqMergeReader;
+    } else {
+      allDataReader = new AllDataReader(sequenceReader, unSeqMergeReader);
+    }
+  }
+
+  public abstract IPointReader getFillResult() throws IOException;
 
   public TSDataType getDataType() {
     return this.dataType;
@@ -48,11 +80,47 @@ public abstract class IFill {
     this.dataType = dataType;
   }
 
-  public long getQueryTime() {
-    return this.queryTime;
-  }
-
   public void setQueryTime(long queryTime) {
     this.queryTime = queryTime;
   }
+
+  private Filter constructFilter(long beforeRange) {
+    //If the fill time range is not set, beforeRange will be set to -1.
+    if (beforeRange == -1) {
+      return null;
+    }
+    return TimeFilter.gtEq(queryTime - beforeRange);
+  }
+
+  class TimeValuePairPointReader implements IPointReader {
+
+    private boolean isUsed;
+    private TimeValuePair pair;
+
+    public TimeValuePairPointReader(TimeValuePair pair) {
+      this.pair = pair;
+      this.isUsed = (pair == null);
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      return !isUsed;
+    }
+
+    @Override
+    public TimeValuePair next() throws IOException {
+      isUsed = true;
+      return pair;
+    }
+
+    @Override
+    public TimeValuePair current() throws IOException {
+      return pair;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+  }
 }
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
index 764f7c7..cab3f36 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
@@ -18,16 +18,20 @@
  */
 package org.apache.iotdb.db.query.fill;
 
-import org.apache.iotdb.db.exception.ProcessorException;
+import java.io.IOException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.UnSupportedFillTypeException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
 
 public class LinearFill extends IFill {
 
   private long beforeRange;
   private long afterRange;
-  private BatchData result;
 
   public LinearFill(long beforeRange, long afterRange) {
     this.beforeRange = beforeRange;
@@ -42,7 +46,6 @@ public class LinearFill extends IFill {
     super(dataType, queryTime);
     this.beforeRange = beforeRange;
     this.afterRange = afterRange;
-    result = new BatchData(dataType, true, true);
   }
 
   public long getBeforeRange() {
@@ -67,7 +70,82 @@ public class LinearFill extends IFill {
   }
 
   @Override
-  public BatchData getFillResult() throws ProcessorException {
-    return result;
+  public void constructReaders(QueryDataSource queryDataSource, QueryContext 
context)
+      throws IOException {
+    super.constructReaders(queryDataSource, context, beforeRange);
+  }
+
+  @Override
+  public IPointReader getFillResult() throws IOException {
+    TimeValuePair beforePair = null;
+    TimeValuePair afterPair = null;
+    while (allDataReader.hasNext()) {
+      afterPair = allDataReader.next();
+      if (afterPair.getTimestamp() <= queryTime) {
+        beforePair = afterPair;
+      } else {
+        break;
+      }
+    }
+
+    if (beforePair == null || beforePair.getTimestamp() == queryTime) {
+      return new TimeValuePairPointReader(beforePair);
+    }
+
+    if (afterPair == null && !allDataReader.hasNext()) {
+      return new TimeValuePairPointReader(null);
+    }
+    if (afterPair == null) {
+      afterPair = allDataReader.next();
+    }
+    if (afterPair.getTimestamp() > queryTime + afterRange) {
+      return new TimeValuePairPointReader(null);
+    }
+    return new TimeValuePairPointReader(average(beforePair, afterPair));
+  }
+
+  //returns the average of two points
+  private TimeValuePair average(TimeValuePair beforePair, TimeValuePair 
afterPair) {
+    double totalTimeLength = afterPair.getTimestamp() - 
beforePair.getTimestamp();
+    double beforeTimeLength = queryTime - beforePair.getTimestamp();
+    switch (dataType) {
+      case INT32:
+        int startIntValue = beforePair.getValue().getInt();
+        int endIntValue = afterPair.getValue().getInt();
+        int fillIntValue =
+            startIntValue + (int) ((double) (endIntValue - startIntValue) / 
totalTimeLength
+                * beforeTimeLength);
+        beforePair.setValue(TsPrimitiveType.getByType(TSDataType.INT32, 
fillIntValue));
+        break;
+      case INT64:
+        long startLongValue = beforePair.getValue().getLong();
+        long endLongValue = afterPair.getValue().getLong();
+        long fillLongValue =
+            startLongValue + (long) ((double) (endLongValue - startLongValue) 
/ totalTimeLength
+                * beforeTimeLength);
+        beforePair.setValue(TsPrimitiveType.getByType(TSDataType.INT64, 
fillLongValue));
+        break;
+      case FLOAT:
+        float startFloatValue = beforePair.getValue().getFloat();
+        float endFloatValue = afterPair.getValue().getFloat();
+        float fillFloatValue =
+            startFloatValue + (float) ((endFloatValue - startFloatValue) / 
totalTimeLength
+                * beforeTimeLength);
+        beforePair.setValue(TsPrimitiveType.getByType(TSDataType.FLOAT, 
fillFloatValue));
+        break;
+      case DOUBLE:
+        double startDoubleValue = beforePair.getValue().getDouble();
+        double endDoubleValue = afterPair.getValue().getDouble();
+        double fillDoubleValue =
+            startDoubleValue + (double) ((endDoubleValue - startDoubleValue) / 
totalTimeLength
+                * beforeTimeLength);
+        beforePair.setValue(TsPrimitiveType.getByType(TSDataType.DOUBLE, 
fillDoubleValue));
+        break;
+      default:
+        throw new UnSupportedFillTypeException("Unsupported linear fill data 
type : " + dataType);
+
+    }
+    beforePair.setTimestamp(queryTime);
+    return beforePair;
   }
 }
\ No newline at end of file
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
index 1c130b9..19da79d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
@@ -18,21 +18,21 @@
  */
 package org.apache.iotdb.db.query.fill;
 
-import org.apache.iotdb.db.exception.ProcessorException;
+import java.io.IOException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
 
 public class PreviousFill extends IFill {
 
   private long beforeRange;
 
-  private BatchData result;
-
   public PreviousFill(TSDataType dataType, long queryTime, long beforeRange) {
     super(dataType, queryTime);
     this.beforeRange = beforeRange;
-    result = new BatchData(dataType, true, true);
   }
 
   public PreviousFill(long beforeRange) {
@@ -44,12 +44,32 @@ public class PreviousFill extends IFill {
     return new PreviousFill(dataType, queryTime, beforeRange);
   }
 
+  @Override
+  public void constructReaders(QueryDataSource queryDataSource, QueryContext 
context)
+      throws IOException {
+    super.constructReaders(queryDataSource, context, beforeRange);
+  }
+
   public long getBeforeRange() {
     return beforeRange;
   }
 
   @Override
-  public BatchData getFillResult() throws ProcessorException {
-    return result;
+  public IPointReader getFillResult() throws IOException {
+    TimeValuePair beforePair = null;
+    TimeValuePair cachedPair = null;
+    while (allDataReader.hasNext()) {
+      cachedPair = allDataReader.next();
+      if (cachedPair.getTimestamp() <= queryTime) {
+        beforePair = cachedPair;
+      } else {
+        break;
+      }
+    }
+
+    if (beforePair != null) {
+      beforePair.setTimestamp(queryTime);
+    }
+    return new TimeValuePairPointReader(beforePair);
   }
 }
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBFillTestIT.java 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBFillTestIT.java
new file mode 100644
index 0000000..7479f6e
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBFillTestIT.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IOTDBFillTestIT {
+  private static IoTDB daemon;
+
+  private static String[] dataSet1 = new String[]{
+      "SET STORAGE GROUP TO root.ln.wf01.wt01",
+      "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE, 
ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, 
ENCODING=PLAIN",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(1, 1.1, false, 11)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(2, 2.2, true, 22)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(3, 3.3, false, 33 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(4, 4.4, false, 44)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(5, 5.5, false, 55)",
+      "flush",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(100, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(150, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(200, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(250, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(300, 500.5, false, 550)",
+      "flush",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(10, 10.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(20, 20.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(30, 30.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(40, 40.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(50, 50.5, false, 550)",
+      "flush",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(500, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(510, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(520, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(530, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(540, 500.5, false, 550)",
+      "flush",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(580, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(590, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(600, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(610, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(620, 500.5, false, 550)",
+  };
+
+  private static final String TIMESTAMP_STR = "Time";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.closeMemControl();
+    daemon = IoTDB.getInstance();
+    daemon.active();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    daemon.stop();
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void LinearFillTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "3,3.3,false,33",
+        "70,70.34,false,374",
+        "70,null,null,null"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select temperature,status, 
hardware from root.ln.wf01.wt01 where time = 3 "
+          + "Fill(int32[linear, 5ms, 5ms], double[linear, 5ms, 5ms], 
boolean[previous, 5ms])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString("root.ln.wf01.wt01.temperature")
+            + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," + 
resultSet.getString("root.ln.wf01.wt01.hardware");
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select temperature,status, hardware 
from root.ln.wf01.wt01 where time = 70 "
+          + "Fill(int32[linear, 500ms, 500ms], double[linear, 500ms, 500ms], 
boolean[previous, 500ms])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString("root.ln.wf01.wt01.temperature")
+            + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," + 
resultSet.getString("root.ln.wf01.wt01.hardware");
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+        System.out.println(ans);
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select temperature,status, hardware 
from root.ln.wf01.wt01 where time = 70 "
+          + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms], 
boolean[previous, 5ms])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString("root.ln.wf01.wt01.temperature")
+            + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," + 
resultSet.getString("root.ln.wf01.wt01.hardware");
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+        System.out.println(ans);
+      }
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void PreviousFillTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "3,3.3,false,33",
+        "70,50.5,false,550",
+        "70,null,null,null"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select temperature,status, 
hardware from root.ln.wf01.wt01 where time = 3 "
+          + "Fill(int32[previous, 5ms], double[previous, 5ms], 
boolean[previous, 5ms])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString("root.ln.wf01.wt01.temperature")
+            + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," + 
resultSet.getString("root.ln.wf01.wt01.hardware");
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select temperature,status, hardware 
from root.ln.wf01.wt01 where time = 70 "
+          + "Fill(int32[previous, 500ms], double[previous, 500ms], 
boolean[previous, 500ms])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString("root.ln.wf01.wt01.temperature")
+            + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," + 
resultSet.getString("root.ln.wf01.wt01.hardware");
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+        System.out.println(ans);
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select temperature,status, hardware 
from root.ln.wf01.wt01 where time = 70 "
+          + "Fill(int32[previous, 15ms], double[previous, 15ms], 
boolean[previous, 5ms])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString("root.ln.wf01.wt01.temperature")
+            + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," + 
resultSet.getString("root.ln.wf01.wt01.hardware");
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+        System.out.println(ans);
+      }
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  private void prepareData() throws SQLException {
+    Connection connection = null;
+    try {
+      connection = DriverManager
+          .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+              "root");
+      Statement statement = connection.createStatement();
+
+      for (String sql : dataSet1) {
+        statement.execute(sql);
+      }
+      statement.close();
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+}
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
index bf2f755..3c0224c 100644
--- 
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
+++ 
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
@@ -82,14 +82,7 @@ public class IOTDBGroupByTestIT {
       "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) 
values(620, 500.5, false, 550)",
   };
 
-  private String insertTemplate1 = "INSERT INTO 
root.vehicle.d0(timestamp,s0,s1,s2,s3"
-      + ") VALUES(%d,%d,%d,%f,%s)";
-
   private static final String TIMESTAMP_STR = "Time";
-  private final String d0s0 = "root.vehicle.d0.s0";
-  private final String d0s1 = "root.vehicle.d0.s1";
-  private final String d0s2 = "root.vehicle.d0.s2";
-  private final String d0s3 = "root.vehicle.d0.s3";
 
   @Before
   public void setUp() throws Exception {
diff --git 
a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java 
b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index 469b3cc..07983b3 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.query.fill.IFill;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -107,14 +108,23 @@ public class MemIntQpExecutor extends 
QueryProcessExecutor {
   }
 
   @Override
-  public QueryDataSet aggregate(List<Path> paths, List<String> aggres, 
IExpression expression) throws ProcessorException, IOException, 
PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
+  public QueryDataSet aggregate(List<Path> paths, List<String> aggres, 
IExpression expression)
+      throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException,
+      QueryFilterOptimizationException {
     return null;
   }
 
   @Override
   public QueryDataSet groupBy(List<Path> paths, List<String> aggres, 
IExpression expression,
       long unit, long origin, List<Pair<Long, Long>> intervals)
-      throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException, QueryFilterOptimizationException {
+      throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException,
+      QueryFilterOptimizationException {
+    return null;
+  }
+
+  @Override
+  public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillTypes)
+      throws ProcessorException, IOException, PathErrorException, 
FileNodeManagerException {
     return null;
   }
 

Reply via email to