This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/aggregate by this push:
new f056180 fill query
f056180 is described below
commit f0561807e9015702432ae5ec0d778666ab8048cd
Author: suyue <[email protected]>
AuthorDate: Fri Mar 22 19:40:20 2019 +0800
fill query
---
.../iotdb/db/qp/executor/OverflowQPExecutor.java | 27 ++-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 12 +
.../db/query/aggregation/impl/CountAggrFunc.java | 6 +-
.../iotdb/db/query/executor/EngineQueryRouter.java | 33 ++-
.../db/query/executor/FillEngineExecutor.java | 92 ++++++++
.../java/org/apache/iotdb/db/query/fill/IFill.java | 82 ++++++-
.../org/apache/iotdb/db/query/fill/LinearFill.java | 90 +++++++-
.../apache/iotdb/db/query/fill/PreviousFill.java | 34 ++-
.../iotdb/db/integration/IOTDBFillTestIT.java | 244 +++++++++++++++++++++
.../iotdb/db/integration/IOTDBGroupByTestIT.java | 7 -
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 14 +-
11 files changed, 590 insertions(+), 51 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 6eb69bb..9e7a5ae 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
+import org.apache.iotdb.db.query.fill.IFill;
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.LoadDataUtils;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -179,20 +180,22 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
@Override
public QueryDataSet aggregate(List<Path> paths, List<String> aggres,
IExpression expression)
- throws ProcessorException, FileNodeManagerException,
QueryFilterOptimizationException, PathErrorException, IOException {
+ throws ProcessorException, FileNodeManagerException,
QueryFilterOptimizationException,
+ PathErrorException, IOException {
return new EngineQueryRouter().aggregate(paths, aggres, expression);
}
- // @Override
- // public QueryDataSet fill(List<Path> fillPaths, long queryTime,
Map<TSDataType, IFill> fillTypes) throws
- // ProcessorException, IOException, PathErrorException {
- // return queryEngine.fill(fillPaths, queryTime, fillTypes);
- // }
+ @Override
+ public QueryDataSet fill(List<Path> fillPaths, long queryTime,
Map<TSDataType, IFill> fillTypes)
+ throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException {
+ return new EngineQueryRouter().fill(fillPaths, queryTime, fillTypes);
+ }
@Override
- public QueryDataSet groupBy(List<Path> paths, List<String> aggres,
IExpression expression, long unit,
- long origin, List<Pair<Long, Long>> intervals)
- throws ProcessorException, FileNodeManagerException,
QueryFilterOptimizationException, PathErrorException, IOException {
+ public QueryDataSet groupBy(List<Path> paths, List<String> aggres,
IExpression expression,
+ long unit, long origin, List<Pair<Long, Long>> intervals)
+ throws ProcessorException, FileNodeManagerException,
QueryFilterOptimizationException,
+ PathErrorException, IOException {
return new EngineQueryRouter().groupBy(paths, aggres, expression, unit,
origin, intervals);
}
@@ -504,7 +507,8 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
// optimize the speed of adding timeseries
String fileNodePath = mManager.getFileNameByPath(path.getFullPath());
// the two map is stored in the storage group node
- Map<String, MeasurementSchema> schemaMap =
mManager.getSchemaMapForOneFileNode(fileNodePath);
+ Map<String, MeasurementSchema> schemaMap = mManager
+ .getSchemaMapForOneFileNode(fileNodePath);
Map<String, Integer> numSchemaMap =
mManager.getNumSchemaMapForOneFileNode(fileNodePath);
String lastNode = path.getMeasurement();
boolean isNewMeasurement = true;
@@ -629,8 +633,7 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
/**
* Delete all data of time series in pathList.
*
- * @param pathList
- * deleted paths
+ * @param pathList deleted paths
*/
private void deleteDataOfTimeSeries(List<String> pathList) throws
ProcessorException {
for (String p : pathList) {
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 7d08808..ea2def9 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -29,9 +30,11 @@ import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
+import org.apache.iotdb.db.query.fill.IFill;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -66,6 +69,11 @@ public abstract class QueryProcessExecutor {
return aggregate(plan.getPaths(), plan.getAggregations(),
((AggregationPlan) plan).getExpression());
}
+
+ if (plan instanceof FillQueryPlan) {
+ FillQueryPlan fillQueryPlan = (FillQueryPlan) plan;
+ return fill(plan.getPaths(), fillQueryPlan.getQueryTime(),
fillQueryPlan.getFillType());
+ }
return queryRouter.query(queryExpression);
}
@@ -97,6 +105,10 @@ public abstract class QueryProcessExecutor {
throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException,
QueryFilterOptimizationException;
+ public abstract QueryDataSet fill(List<Path> fillPaths, long queryTime,
Map<TSDataType,
+ IFill> fillTypes)
+ throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException;
+
/**
* executeWithGlobalTimeFilter update command and return whether the
operator is successful.
*
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
index 1964875..301ce9a 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrFunc.java
@@ -56,8 +56,10 @@ public class CountAggrFunc extends AggregateFunction {
@Override
public void calculateValueFromPageHeader(PageHeader pageHeader) {
- LOGGER.debug("PageHeader>>>>>>>>>>>>num of rows:{}, minTimeStamp:{},
maxTimeStamp{}",
- pageHeader.getNumOfValues(), pageHeader.getMinTimestamp(),
pageHeader.getMaxTimestamp());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("PageHeader>>>>>>>>>>>>num of rows:{}, minTimeStamp:{},
maxTimeStamp{}",
+ pageHeader.getNumOfValues(), pageHeader.getMinTimestamp(),
pageHeader.getMaxTimestamp());
+ }
long preValue = resultData.getLongRet();
preValue += pageHeader.getNumOfValues();
resultData.setLongRet(preValue);
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index d7f6cf0..176cac0 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.exception.FileNodeManagerException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -31,7 +32,9 @@ import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.fill.IFill;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -129,19 +132,14 @@ public class EngineQueryRouter {
/**
* execute groupBy query.
+ *
* @param selectedSeries select path list
* @param aggres aggregation name list
* @param expression filter expression
* @param unit time granularity for interval partitioning, unit is ms.
- * @param origin the datum time point for interval division is divided into
a time interval
- * for each TimeUnit time from this point forward and backward.
+ * @param origin the datum time point for interval division is divided into
a time interval for
+ * each TimeUnit time from this point forward and backward.
* @param intervals time intervals, closed interval.
- * @return
- * @throws ProcessorException
- * @throws QueryFilterOptimizationException
- * @throws FileNodeManagerException
- * @throws PathErrorException
- * @throws IOException
*/
public QueryDataSet groupBy(List<Path> selectedSeries, List<String> aggres,
IExpression expression, long unit, long origin, List<Pair<Long, Long>>
intervals)
@@ -204,6 +202,25 @@ public class EngineQueryRouter {
}
/**
+ * execute fill query.
+ * @param fillPaths select path list
+ * @param queryTime timestamp
+ * @param fillType type IFill map
+ * @return
+ */
+ public QueryDataSet fill(List<Path> fillPaths, long queryTime,
Map<TSDataType, IFill> fillType)
+ throws FileNodeManagerException, PathErrorException, IOException,
ProcessorException {
+ long nextJobId = getNextJobId();
+ QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
+
+ QueryContext context = new QueryContext();
+ FillEngineExecutor fillEngineExecutor = new FillEngineExecutor(nextJobId,
fillPaths, queryTime,
+ fillType);
+ return fillEngineExecutor.execute(context);
+ }
+
+ /**
* sort intervals by start time and merge overlapping intervals.
*
* @param intervals time interval
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
new file mode 100644
index 0000000..4907aad
--- /dev/null
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.executor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryDataSourceManager;
+import org.apache.iotdb.db.query.control.QueryTokenManager;
+import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.db.query.fill.PreviousFill;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+public class FillEngineExecutor {
+
+ private long jobId;
+ private List<Path> selectedSeries;
+ private long queryTime;
+ private Map<TSDataType, IFill> typeIFillMap;
+
+ public FillEngineExecutor(long jobId, List<Path> selectedSeries, long
queryTime,
+ Map<TSDataType, IFill> typeIFillMap) {
+ this.jobId = jobId;
+ this.selectedSeries = selectedSeries;
+ this.queryTime = queryTime;
+ this.typeIFillMap = typeIFillMap;
+ }
+
+ /**
+ * execute fill.
+ * @param context query context
+ */
+ public QueryDataSet execute(QueryContext context)
+ throws FileNodeManagerException, PathErrorException, IOException,
ProcessorException {
+ QueryTokenManager.getInstance().beginQueryOfGivenQueryPaths(jobId,
selectedSeries);
+
+ List<IFill> fillList = new ArrayList<>();
+ List<TSDataType> dataTypeList = new ArrayList<>();
+ for (Path path : selectedSeries) {
+ QueryDataSource queryDataSource = QueryDataSourceManager
+ .getQueryDataSource(jobId, path, context);
+ TSDataType dataType =
MManager.getInstance().getSeriesType(path.getFullPath());
+ dataTypeList.add(dataType);
+ IFill fill = null;
+ if (!typeIFillMap.containsKey(dataType)) {
+ fill = new PreviousFill(dataType, queryTime, 0);
+ } else {
+ fill = typeIFillMap.get(dataType).copy(path);
+ }
+ fill.setDataType(dataType);
+ fill.setQueryTime(queryTime);
+ fill.constructReaders(queryDataSource, context);
+ fillList.add(fill);
+ }
+
+ List<IPointReader> readers = new ArrayList<>();
+ for (IFill fill : fillList) {
+ readers.add(fill.getFillResult());
+ }
+
+ return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypeList,
readers);
+ }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index 7ddce7a..be77c94 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -18,16 +18,27 @@
*/
package org.apache.iotdb.db.query.fill;
-import org.apache.iotdb.db.exception.ProcessorException;
+import java.io.IOException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
+import org.apache.iotdb.db.query.reader.AllDataReader;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
public abstract class IFill {
long queryTime;
TSDataType dataType;
+ IPointReader allDataReader;
+
public IFill(TSDataType dataType, long queryTime) {
this.dataType = dataType;
this.queryTime = queryTime;
@@ -38,7 +49,28 @@ public abstract class IFill {
public abstract IFill copy(Path path);
- public abstract BatchData getFillResult() throws ProcessorException;
+ public abstract void constructReaders(QueryDataSource queryDataSource,
QueryContext context)
+ throws IOException;
+
+ void constructReaders(QueryDataSource queryDataSource, QueryContext context,
long beforeRange)
+ throws IOException {
+ Filter timeFilter = constructFilter(beforeRange);
+ // sequence reader for sealed tsfile, unsealed tsfile, memory
+ SequenceDataReader sequenceReader = new
SequenceDataReader(queryDataSource.getSeqDataSource(),
+ timeFilter, context, false);
+
+ // unseq reader for all chunk groups in unSeqFile, memory
+ PriorityMergeReader unSeqMergeReader = SeriesReaderFactory.getInstance()
+ .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(),
timeFilter);
+
+ if (sequenceReader == null) {
+ allDataReader = unSeqMergeReader;
+ } else {
+ allDataReader = new AllDataReader(sequenceReader, unSeqMergeReader);
+ }
+ }
+
+ public abstract IPointReader getFillResult() throws IOException;
public TSDataType getDataType() {
return this.dataType;
@@ -48,11 +80,47 @@ public abstract class IFill {
this.dataType = dataType;
}
- public long getQueryTime() {
- return this.queryTime;
- }
-
public void setQueryTime(long queryTime) {
this.queryTime = queryTime;
}
+
+ private Filter constructFilter(long beforeRange) {
+ //If the fill time range is not set, beforeRange will be set to -1.
+ if (beforeRange == -1) {
+ return null;
+ }
+ return TimeFilter.gtEq(queryTime - beforeRange);
+ }
+
+ class TimeValuePairPointReader implements IPointReader {
+
+ private boolean isUsed;
+ private TimeValuePair pair;
+
+ public TimeValuePairPointReader(TimeValuePair pair) {
+ this.pair = pair;
+ this.isUsed = (pair == null);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return !isUsed;
+ }
+
+ @Override
+ public TimeValuePair next() throws IOException {
+ isUsed = true;
+ return pair;
+ }
+
+ @Override
+ public TimeValuePair current() throws IOException {
+ return pair;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ }
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
index 764f7c7..cab3f36 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
@@ -18,16 +18,20 @@
*/
package org.apache.iotdb.db.query.fill;
-import org.apache.iotdb.db.exception.ProcessorException;
+import java.io.IOException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.UnSupportedFillTypeException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
public class LinearFill extends IFill {
private long beforeRange;
private long afterRange;
- private BatchData result;
public LinearFill(long beforeRange, long afterRange) {
this.beforeRange = beforeRange;
@@ -42,7 +46,6 @@ public class LinearFill extends IFill {
super(dataType, queryTime);
this.beforeRange = beforeRange;
this.afterRange = afterRange;
- result = new BatchData(dataType, true, true);
}
public long getBeforeRange() {
@@ -67,7 +70,82 @@ public class LinearFill extends IFill {
}
@Override
- public BatchData getFillResult() throws ProcessorException {
- return result;
+ public void constructReaders(QueryDataSource queryDataSource, QueryContext
context)
+ throws IOException {
+ super.constructReaders(queryDataSource, context, beforeRange);
+ }
+
+ @Override
+ public IPointReader getFillResult() throws IOException {
+ TimeValuePair beforePair = null;
+ TimeValuePair afterPair = null;
+ while (allDataReader.hasNext()) {
+ afterPair = allDataReader.next();
+ if (afterPair.getTimestamp() <= queryTime) {
+ beforePair = afterPair;
+ } else {
+ break;
+ }
+ }
+
+ if (beforePair == null || beforePair.getTimestamp() == queryTime) {
+ return new TimeValuePairPointReader(beforePair);
+ }
+
+ if (afterPair == null && !allDataReader.hasNext()) {
+ return new TimeValuePairPointReader(null);
+ }
+ if (afterPair == null) {
+ afterPair = allDataReader.next();
+ }
+ if (afterPair.getTimestamp() > queryTime + afterRange) {
+ return new TimeValuePairPointReader(null);
+ }
+ return new TimeValuePairPointReader(average(beforePair, afterPair));
+ }
+
+ //returns the average of two points
+ private TimeValuePair average(TimeValuePair beforePair, TimeValuePair
afterPair) {
+ double totalTimeLength = afterPair.getTimestamp() -
beforePair.getTimestamp();
+ double beforeTimeLength = queryTime - beforePair.getTimestamp();
+ switch (dataType) {
+ case INT32:
+ int startIntValue = beforePair.getValue().getInt();
+ int endIntValue = afterPair.getValue().getInt();
+ int fillIntValue =
+ startIntValue + (int) ((double) (endIntValue - startIntValue) /
totalTimeLength
+ * beforeTimeLength);
+ beforePair.setValue(TsPrimitiveType.getByType(TSDataType.INT32,
fillIntValue));
+ break;
+ case INT64:
+ long startLongValue = beforePair.getValue().getLong();
+ long endLongValue = afterPair.getValue().getLong();
+ long fillLongValue =
+ startLongValue + (long) ((double) (endLongValue - startLongValue)
/ totalTimeLength
+ * beforeTimeLength);
+ beforePair.setValue(TsPrimitiveType.getByType(TSDataType.INT64,
fillLongValue));
+ break;
+ case FLOAT:
+ float startFloatValue = beforePair.getValue().getFloat();
+ float endFloatValue = afterPair.getValue().getFloat();
+ float fillFloatValue =
+ startFloatValue + (float) ((endFloatValue - startFloatValue) /
totalTimeLength
+ * beforeTimeLength);
+ beforePair.setValue(TsPrimitiveType.getByType(TSDataType.FLOAT,
fillFloatValue));
+ break;
+ case DOUBLE:
+ double startDoubleValue = beforePair.getValue().getDouble();
+ double endDoubleValue = afterPair.getValue().getDouble();
+ double fillDoubleValue =
+ startDoubleValue + (double) ((endDoubleValue - startDoubleValue) /
totalTimeLength
+ * beforeTimeLength);
+ beforePair.setValue(TsPrimitiveType.getByType(TSDataType.DOUBLE,
fillDoubleValue));
+ break;
+ default:
+ throw new UnSupportedFillTypeException("Unsupported linear fill data
type : " + dataType);
+
+ }
+ beforePair.setTimestamp(queryTime);
+ return beforePair;
}
}
\ No newline at end of file
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
index 1c130b9..19da79d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
@@ -18,21 +18,21 @@
*/
package org.apache.iotdb.db.query.fill;
-import org.apache.iotdb.db.exception.ProcessorException;
+import java.io.IOException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
public class PreviousFill extends IFill {
private long beforeRange;
- private BatchData result;
-
public PreviousFill(TSDataType dataType, long queryTime, long beforeRange) {
super(dataType, queryTime);
this.beforeRange = beforeRange;
- result = new BatchData(dataType, true, true);
}
public PreviousFill(long beforeRange) {
@@ -44,12 +44,32 @@ public class PreviousFill extends IFill {
return new PreviousFill(dataType, queryTime, beforeRange);
}
+ @Override
+ public void constructReaders(QueryDataSource queryDataSource, QueryContext
context)
+ throws IOException {
+ super.constructReaders(queryDataSource, context, beforeRange);
+ }
+
public long getBeforeRange() {
return beforeRange;
}
@Override
- public BatchData getFillResult() throws ProcessorException {
- return result;
+ public IPointReader getFillResult() throws IOException {
+ TimeValuePair beforePair = null;
+ TimeValuePair cachedPair = null;
+ while (allDataReader.hasNext()) {
+ cachedPair = allDataReader.next();
+ if (cachedPair.getTimestamp() <= queryTime) {
+ beforePair = cachedPair;
+ } else {
+ break;
+ }
+ }
+
+ if (beforePair != null) {
+ beforePair.setTimestamp(queryTime);
+ }
+ return new TimeValuePairPointReader(beforePair);
}
}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBFillTestIT.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBFillTestIT.java
new file mode 100644
index 0000000..7479f6e
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBFillTestIT.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IOTDBFillTestIT {
+ private static IoTDB daemon;
+
+ private static String[] dataSet1 = new String[]{
+ "SET STORAGE GROUP TO root.ln.wf01.wt01",
+ "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN,
ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE,
ENCODING=PLAIN",
+ "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32,
ENCODING=PLAIN",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(1, 1.1, false, 11)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(2, 2.2, true, 22)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(3, 3.3, false, 33 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(4, 4.4, false, 44)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(5, 5.5, false, 55)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(100, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(150, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(200, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(250, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(300, 500.5, false, 550)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(10, 10.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(20, 20.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(30, 30.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(40, 40.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(50, 50.5, false, 550)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(500, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(510, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(520, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(530, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(540, 500.5, false, 550)",
+ "flush",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(580, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(590, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(600, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(610, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(620, 500.5, false, 550)",
+ };
+
+ private static final String TIMESTAMP_STR = "Time";
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.closeMemControl();
+ daemon = IoTDB.getInstance();
+ daemon.active();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ daemon.stop();
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void LinearFillTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "3,3.3,false,33",
+ "70,70.34,false,374",
+ "70,null,null,null"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select temperature,status,
hardware from root.ln.wf01.wt01 where time = 3 "
+ + "Fill(int32[linear, 5ms, 5ms], double[linear, 5ms, 5ms],
boolean[previous, 5ms])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString("root.ln.wf01.wt01.temperature")
+ + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," +
resultSet.getString("root.ln.wf01.wt01.hardware");
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select temperature,status, hardware
from root.ln.wf01.wt01 where time = 70 "
+ + "Fill(int32[linear, 500ms, 500ms], double[linear, 500ms, 500ms],
boolean[previous, 500ms])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString("root.ln.wf01.wt01.temperature")
+ + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," +
resultSet.getString("root.ln.wf01.wt01.hardware");
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ System.out.println(ans);
+ }
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select temperature,status, hardware
from root.ln.wf01.wt01 where time = 70 "
+ + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms],
boolean[previous, 5ms])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString("root.ln.wf01.wt01.temperature")
+ + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," +
resultSet.getString("root.ln.wf01.wt01.hardware");
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ System.out.println(ans);
+ }
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void PreviousFillTest() throws SQLException {
+ String[] retArray1 = new String[]{
+ "3,3.3,false,33",
+ "70,50.5,false,550",
+ "70,null,null,null"
+ };
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement();
+ boolean hasResultSet = statement.execute("select temperature,status,
hardware from root.ln.wf01.wt01 where time = 3 "
+ + "Fill(int32[previous, 5ms], double[previous, 5ms],
boolean[previous, 5ms])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString("root.ln.wf01.wt01.temperature")
+ + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," +
resultSet.getString("root.ln.wf01.wt01.hardware");
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select temperature,status, hardware
from root.ln.wf01.wt01 where time = 70 "
+ + "Fill(int32[previous, 500ms], double[previous, 500ms],
boolean[previous, 500ms])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString("root.ln.wf01.wt01.temperature")
+ + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," +
resultSet.getString("root.ln.wf01.wt01.hardware");
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ System.out.println(ans);
+ }
+ statement.close();
+
+ statement = connection.createStatement();
+ hasResultSet = statement.execute("select temperature,status, hardware
from root.ln.wf01.wt01 where time = 70 "
+ + "Fill(int32[previous, 15ms], double[previous, 15ms],
boolean[previous, 5ms])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," +
resultSet.getString("root.ln.wf01.wt01.temperature")
+ + "," + resultSet.getString("root.ln.wf01.wt01.status")+ "," +
resultSet.getString("root.ln.wf01.wt01.hardware");
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ System.out.println(ans);
+ }
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ private void prepareData() throws SQLException {
+ Connection connection = null;
+ try {
+ connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement();
+
+ for (String sql : dataSet1) {
+ statement.execute(sql);
+ }
+ statement.close();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+}
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
index bf2f755..3c0224c 100644
---
a/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
+++
b/iotdb/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByTestIT.java
@@ -82,14 +82,7 @@ public class IOTDBGroupByTestIT {
"INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware)
values(620, 500.5, false, 550)",
};
- private String insertTemplate1 = "INSERT INTO
root.vehicle.d0(timestamp,s0,s1,s2,s3"
- + ") VALUES(%d,%d,%d,%f,%s)";
-
private static final String TIMESTAMP_STR = "Time";
- private final String d0s0 = "root.vehicle.d0.s0";
- private final String d0s1 = "root.vehicle.d0.s1";
- private final String d0s2 = "root.vehicle.d0.s2";
- private final String d0s3 = "root.vehicle.d0.s3";
@Before
public void setUp() throws Exception {
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index 469b3cc..07983b3 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.query.fill.IFill;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -107,14 +108,23 @@ public class MemIntQpExecutor extends
QueryProcessExecutor {
}
@Override
- public QueryDataSet aggregate(List<Path> paths, List<String> aggres,
IExpression expression) throws ProcessorException, IOException,
PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
+ public QueryDataSet aggregate(List<Path> paths, List<String> aggres,
IExpression expression)
+ throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException,
+ QueryFilterOptimizationException {
return null;
}
@Override
public QueryDataSet groupBy(List<Path> paths, List<String> aggres,
IExpression expression,
long unit, long origin, List<Pair<Long, Long>> intervals)
- throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException, QueryFilterOptimizationException {
+ throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException,
+ QueryFilterOptimizationException {
+ return null;
+ }
+
+ @Override
+ public QueryDataSet fill(List<Path> fillPaths, long queryTime,
Map<TSDataType, IFill> fillTypes)
+ throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException {
return null;
}