This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch rc/1.2.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.2.0 by this push:
new f98b8ca7825 [To rc/1.2.0] Add executeGroupByQueryIntervalQuery rpc
interface (#10573)
f98b8ca7825 is described below
commit f98b8ca78251bcbde93a49b31a4b6cf27e1dca0f
Author: Beyyes <[email protected]>
AuthorDate: Tue Jul 18 20:03:54 2023 +0800
[To rc/1.2.0] Add executeGroupByQueryIntervalQuery rpc interface (#10573)
---
.../thrift/src/main/thrift/client.thrift | 17 ++
pom.xml | 2 +-
.../fragment/FragmentInstanceContext.java | 2 +-
.../fragment/FragmentInstanceManager.java | 4 +-
.../service/thrift/impl/ClientRPCServiceImpl.java | 224 ++++++++++++++++++++-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 191 ++++++++++++++++++
.../iotdb/tsfile/read/filter/TimeFilter.java | 89 ++++++++
.../tsfile/read/filter/factory/FilterFactory.java | 4 +
.../read/filter/factory/FilterSerializeId.java | 3 +-
9 files changed, 528 insertions(+), 8 deletions(-)
diff --git a/iotdb-protocol/thrift/src/main/thrift/client.thrift
b/iotdb-protocol/thrift/src/main/thrift/client.thrift
index e111c8d2a35..323b955f1da 100644
--- a/iotdb-protocol/thrift/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift/src/main/thrift/client.thrift
@@ -352,6 +352,21 @@ struct TSAggregationQueryReq {
11: optional bool legalPathNodes
}
+struct TSGroupByQueryIntervalReq {
+ 1: required i64 sessionId
+ 2: required i64 statementId
+ 3: required string device
+ 4: required string measurement
+ 5: required i32 dataType
+ 6: required common.TAggregationType aggregationType
+ 7: optional string database
+ 8: optional i64 startTime
+ 9: optional i64 endTime
+ 10: optional i64 interval
+ 11: optional i32 fetchSize
+ 12: optional i64 timeout
+}
+
struct TSCreateMultiTimeseriesReq {
1: required i64 sessionId
2: required list<string> paths
@@ -500,6 +515,8 @@ service IClientRPCService {
TSExecuteStatementResp executeAggregationQueryV2(1:TSAggregationQueryReq
req);
+ TSExecuteStatementResp
executeGroupByQueryIntervalQuery(1:TSGroupByQueryIntervalReq req);
+
TSFetchResultsResp fetchResultsV2(1:TSFetchResultsReq req);
TSOpenSessionResp openSession(1:TSOpenSessionReq req);
diff --git a/pom.xml b/pom.xml
index 014a68b4d7d..a192948f96e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,7 +111,7 @@
<module>iotdb-client/compile-tools</module>
<module>iotdb-client/client-cpp</module>
<module>metrics</module>
- <!-- <module>integration-test</module>-->
+<!-- <module>integration-test</module>-->
<module>consensus</module>
<module>library-udf</module>
<module>iotdb-api/udf-api</module>
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 6434e8faa06..37120a4c102 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -381,7 +381,7 @@ public class FragmentInstanceContext extends QueryContext {
* All file paths used by this fragment instance must be cleared and thus
the usage reference must
* be decreased.
*/
- protected synchronized void releaseResource() {
+ public synchronized void releaseResource() {
// For schema related query FI, closedFilePaths and unClosedFilePaths will
be null
if (closedFilePaths != null) {
for (TsFileResource tsFile : closedFilePaths) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 74e2985207d..997f42e2bd2 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -64,12 +64,12 @@ public class FragmentInstanceManager {
private final IDriverScheduler scheduler = DriverScheduler.getInstance();
private final ScheduledExecutorService instanceManagementExecutor;
- private final ExecutorService instanceNotificationExecutor;
+ public final ExecutorService instanceNotificationExecutor;
private final Duration infoCacheTime;
// record failed instances count
- private final CounterStat failedInstances = new CounterStat();
+ public final CounterStat failedInstances = new CounterStat();
private final ExecutorService intoOperationExecutor;
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 8a1f9e323d9..cc931b862bc 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -18,11 +18,20 @@
*/
package org.apache.iotdb.db.service.thrift.impl;
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -32,8 +41,25 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import
org.apache.iotdb.db.mpp.execution.operator.source.AbstractSeriesAggregationScanOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
+import
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -43,8 +69,14 @@ import
org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.ASTVisitor;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
@@ -71,6 +103,7 @@ import org.apache.iotdb.db.quotas.OperationQuota;
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -99,6 +132,7 @@ import
org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
+import org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -121,10 +155,18 @@ import
org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import io.airlift.units.Duration;
+import io.jsonwebtoken.lang.Strings;
+import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,14 +175,20 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+import static
org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
+import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
@@ -177,12 +225,13 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
private static final SelectResult OLD_SELECT_RESULT =
(resp, queryExecution, fetchSize) -> {
- Pair<TSQueryDataSet, Boolean> pair =
- QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution,
fetchSize);
+ Pair<TSQueryDataSet, Boolean> pair =
convertTsBlockByFetchSize(queryExecution, fetchSize);
resp.setQueryDataSet(pair.left);
return pair.right;
};
+ public static Duration DEFAULT_TIME_SLICE = new Duration(60_000,
TimeUnit.MILLISECONDS);
+
public ClientRPCServiceImpl() {
partitionFetcher = ClusterPartitionFetcher.getInstance();
schemaFetcher = ClusterSchemaFetcher.getInstance();
@@ -555,6 +604,175 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
}
+ @Override
+ public TSExecuteStatementResp
executeGroupByQueryIntervalQuery(TSGroupByQueryIntervalReq req)
+ throws TException {
+
+ try {
+ IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+ String database = req.getDatabase();
+ if (StringUtils.isEmpty(database)) {
+ String[] splits = Strings.split(req.getDevice(), "\\.");
+ database = String.format("%s.%s", splits[0], splits[1]);
+ }
+ String deviceId = req.getDevice();
+ String measurementId = req.getMeasurement();
+ TSDataType dataType = TSDataType.getTsDataType((byte) req.getDataType());
+
+ // only one database, one device, one time interval
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new
HashMap<>();
+ TTimePartitionSlot timePartitionSlot =
+ TimePartitionUtils.getTimePartition(req.getStartTime());
+ DataPartitionQueryParam queryParam =
+ new DataPartitionQueryParam(
+ deviceId, Collections.singletonList(timePartitionSlot), false,
false);
+ sgNameToQueryParamsMap.put(database,
Collections.singletonList(queryParam));
+ DataPartition dataPartition =
partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ List<DataRegion> dataRegionList = new ArrayList<>();
+ List<TRegionReplicaSet> replicaSets =
+ dataPartition.getDataRegionReplicaSet(
+ deviceId, Collections.singletonList(timePartitionSlot));
+ for (TRegionReplicaSet region : replicaSets) {
+ dataRegionList.add(
+ StorageEngine.getInstance()
+ .getDataRegion(new
DataRegionId(region.getRegionId().getId())));
+ }
+
+ List<TsBlock> blockResult =
+ executeGroupByQueryInternal(
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ deviceId,
+ measurementId,
+ dataType,
+ true,
+ req.getStartTime(),
+ req.getEndTime(),
+ req.getInterval(),
+ req.getAggregationType(),
+ dataRegionList);
+
+ String outputColumnName = req.getAggregationType().name();
+ List<ColumnHeader> columnHeaders =
+ Collections.singletonList(new ColumnHeader(outputColumnName,
dataType));
+ DatasetHeader header = new DatasetHeader(columnHeaders, false);
+
header.setColumnToTsBlockIndexMap(Collections.singletonList(outputColumnName));
+
+ TSExecuteStatementResp resp = createResponse(header, 1);
+ TSQueryDataSet queryDataSet = convertTsBlockByFetchSize(blockResult);
+ resp.setQueryDataSet(queryDataSet);
+
+ return resp;
+ } catch (Exception e) {
+ return RpcUtils.getTSExecuteStatementResp(
+ onQueryException(e, "\"" + req + "\". " +
OperationType.EXECUTE_AGG_QUERY));
+ } finally {
+ SESSION_MANAGER.updateIdleTime();
+ }
+ }
+
+ private List<TsBlock> executeGroupByQueryInternal(
+ SessionInfo sessionInfo,
+ String device,
+ String measurement,
+ TSDataType dataType,
+ boolean isAligned,
+ long startTime,
+ long endTme,
+ long interval,
+ TAggregationType aggregationType,
+ List<DataRegion> dataRegionList)
+ throws IllegalPathException {
+
+ int dataRegionSize = dataRegionList.size();
+ if (dataRegionSize != 1) {
+ throw new IllegalArgumentException(
+ "dataRegionList.size() should only be 1 now, current size is " +
dataRegionSize);
+ }
+
+ Filter timeFilter = new TimeFilter.TimeGtEqAndLt(startTime, endTme);
+
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(
+ instanceId,
FragmentInstanceManager.getInstance().instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(
+ instanceId, stateMachine, sessionInfo, dataRegionList.get(0),
timeFilter);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId,
SeriesScanOperator.class.getSimpleName());
+ driverContext
+ .getOperatorContexts()
+ .forEach(operatorContext ->
operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));
+
+ SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
+ scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
+ scanOptionsBuilder.withGlobalTimeFilter(timeFilter);
+
+ Aggregator aggregator =
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(aggregationType, dataType,
null, null, true),
+ AggregationStep.SINGLE,
+ Collections.singletonList(new InputLocation[] {new
InputLocation(0, 0)}));
+
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(startTime, endTme, interval, interval, true);
+
+ IMeasurementSchema measurementSchema = new MeasurementSchema(measurement,
dataType);
+ AbstractSeriesAggregationScanOperator operator;
+ PartialPath path;
+ if (isAligned) {
+ path =
+ new AlignedPath(
+ device,
+ Collections.singletonList(measurement),
+ Collections.singletonList(measurementSchema));
+ operator =
+ new AlignedSeriesAggregationScanOperator(
+ planNodeId,
+ (AlignedPath) path,
+ Ordering.ASC,
+ scanOptionsBuilder.build(),
+ driverContext.getOperatorContexts().get(0),
+ Collections.singletonList(aggregator),
+ initTimeRangeIterator(groupByTimeParameter, true, true),
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ } else {
+ path = new MeasurementPath(device, measurement, measurementSchema);
+ operator =
+ new SeriesAggregationScanOperator(
+ planNodeId,
+ path,
+ Ordering.ASC,
+ scanOptionsBuilder.build(),
+ driverContext.getOperatorContexts().get(0),
+ Collections.singletonList(aggregator),
+ initTimeRangeIterator(groupByTimeParameter, true, true),
+ groupByTimeParameter,
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ }
+
+ try {
+ List<TsBlock> result = new ArrayList<>();
+ fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
+
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());
+
+ while (operator.hasNext()) {
+ result.add(operator.next());
+ }
+
+ return result;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ fragmentInstanceContext.releaseResource();
+ }
+ }
+
@Override
public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq
req) {
return executeStatementV2(req);
@@ -1130,7 +1348,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
try (SetThreadName queryName = new
SetThreadName(queryExecution.getQueryId())) {
Pair<TSQueryDataSet, Boolean> pair =
- QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution,
req.fetchSize);
+ convertTsBlockByFetchSize(queryExecution, req.fetchSize);
TSQueryDataSet result = pair.left;
finished = pair.right;
boolean hasResultSet = result.bufferForTime().limit() != 0;
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 6baee727383..9aa5c0b8481 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -244,6 +244,197 @@ public class QueryDataSetUtils {
return new Pair<>(tsQueryDataSet, finished);
}
+ public static TSQueryDataSet convertTsBlockByFetchSize(List<TsBlock>
tsBlocks)
+ throws IOException {
+ TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+
+ // one time column and each value column has an actual value buffer and a
bitmap value to
+ // indicate whether it is a null
+ int columnNum = 1;
+ int columnNumWithTime = columnNum * 2 + 1;
+ DataOutputStream[] dataOutputStreams = new
DataOutputStream[columnNumWithTime];
+ ByteArrayOutputStream[] byteArrayOutputStreams = new
ByteArrayOutputStream[columnNumWithTime];
+ for (int i = 0; i < columnNumWithTime; i++) {
+ byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+ dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+ }
+
+ int rowCount = 0;
+ int[] valueOccupation = new int[columnNum];
+
+ // used to record a bitmap for every 8 points
+ int[] bitmaps = new int[columnNum];
+ for (TsBlock tsBlock : tsBlocks) {
+ if (tsBlock.isEmpty()) {
+ continue;
+ }
+
+ int currentCount = tsBlock.getPositionCount();
+ // serialize time column
+ for (int i = 0; i < currentCount; i++) {
+ // use columnOutput to write byte array
+ dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+ }
+
+ // serialize each value column and its bitmap
+ for (int k = 0; k < columnNum; k++) {
+ // get DataOutputStream for current value column and its bitmap
+ DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k +
1)];
+
+ Column column = tsBlock.getColumn(k);
+ TSDataType type = column.getDataType();
+ switch (type) {
+ case INT32:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ dataOutputStream.writeInt(column.getInt(i));
+ valueOccupation[k] += 4;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ case INT64:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ dataOutputStream.writeLong(column.getLong(i));
+ valueOccupation[k] += 8;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ dataOutputStream.writeFloat(column.getFloat(i));
+ valueOccupation[k] += 4;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ dataOutputStream.writeDouble(column.getDouble(i));
+ valueOccupation[k] += 8;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ case BOOLEAN:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ dataOutputStream.writeBoolean(column.getBoolean(i));
+ valueOccupation[k] += 1;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ case TEXT:
+ for (int i = 0; i < currentCount; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[k] = bitmaps[k] << 1;
+ } else {
+ bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+ Binary binary = column.getBinary(i);
+ dataOutputStream.writeInt(binary.getLength());
+ dataOutputStream.write(binary.getValues());
+ valueOccupation[k] = valueOccupation[k] + 4 +
binary.getLength();
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[k]);
+ // we should clear the bitmap every 8 points
+ bitmaps[k] = 0;
+ }
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
+ }
+ if (k != columnNum - 1) {
+ rowCount -= currentCount;
+ }
+ }
+ }
+ // feed the remaining bitmap
+ int remaining = rowCount % 8;
+ for (int k = 0; k < columnNum; k++) {
+ if (remaining != 0) {
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k +
1)];
+ dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
+ }
+ }
+
+ // calculate the time buffer size
+ int timeOccupation = rowCount * 8;
+ ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+ timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+ timeBuffer.flip();
+ tsQueryDataSet.setTime(timeBuffer);
+
+ // calculate the bitmap buffer size
+ int bitmapOccupation = (rowCount + 7) / 8;
+
+ List<ByteBuffer> bitmapList = new LinkedList<>();
+ List<ByteBuffer> valueList = new LinkedList<>();
+ for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+ ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) /
2]);
+ valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+ valueBuffer.flip();
+ valueList.add(valueBuffer);
+
+ ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+ bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
+ bitmapBuffer.flip();
+ bitmapList.add(bitmapBuffer);
+ }
+ tsQueryDataSet.setBitmapList(bitmapList);
+ tsQueryDataSet.setValueList(valueList);
+ return tsQueryDataSet;
+ }
+
/** pair.left is serialized TsBlock pair.right indicates if the query
finished */
// To fetch required amounts of data and combine them through List
public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize(
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
index 0cbd94dc0a6..e1a028b3687 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
@@ -18,8 +18,10 @@
*/
package org.apache.iotdb.tsfile.read.filter;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterSerializeId;
import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
import org.apache.iotdb.tsfile.read.filter.operator.Between;
import org.apache.iotdb.tsfile.read.filter.operator.Eq;
@@ -29,7 +31,11 @@ import org.apache.iotdb.tsfile.read.filter.operator.In;
import org.apache.iotdb.tsfile.read.filter.operator.Lt;
import org.apache.iotdb.tsfile.read.filter.operator.LtEq;
import org.apache.iotdb.tsfile.read.filter.operator.NotEq;
+import org.apache.iotdb.tsfile.read.filter.operator.OrFilter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -227,4 +233,87 @@ public class TimeFilter {
public static Filter defaultTimeFilter(boolean ascending) {
return ascending ? TimeFilter.gtEq(Long.MIN_VALUE) :
TimeFilter.ltEq(Long.MAX_VALUE);
}
+
+ public static class TimeGtEqAndLt implements Filter {
+
+ private long startTime;
+
+ private long endTime;
+
+ public TimeGtEqAndLt() {}
+
+ public TimeGtEqAndLt(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ }
+
+ @Override
+ public boolean satisfy(Statistics statistics) {
+ return !(statistics.getEndTime() < startTime ||
statistics.getStartTime() >= endTime);
+ }
+
+ @Override
+ public boolean allSatisfy(Statistics statistics) {
+ return startTime <= statistics.getStartTime() && statistics.getEndTime()
< endTime;
+ }
+
+ @Override
+ public boolean satisfy(long time, Object value) {
+ return startTime <= time && time < endTime;
+ }
+
+ @Override
+ public boolean satisfyStartEndTime(long startTime, long endTime) {
+ return !(endTime < this.startTime || startTime >= this.endTime);
+ }
+
+ @Override
+ public boolean containStartEndTime(long startTime, long endTime) {
+ return this.startTime <= startTime && endTime < this.endTime;
+ }
+
+ @Override
+ public Filter copy() {
+ return new TimeGtEqAndLt(startTime, endTime);
+ }
+
+ @Override
+ public String toString() {
+ return "TimeGtEqAndLt{" + "startTime=" + startTime + ", endTime=" +
endTime + '}';
+ }
+
+ @Override
+ public void serialize(DataOutputStream outputStream) {
+ try {
+ outputStream.write(getSerializeId().ordinal());
+ outputStream.writeLong(startTime);
+ outputStream.writeLong(endTime);
+ } catch (IOException ignored) {
+ // ignored
+ }
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ startTime = buffer.getLong();
+ endTime = buffer.getLong();
+ }
+
+ @Override
+ public FilterSerializeId getSerializeId() {
+ return FilterSerializeId.TIME_GTEQ_AND_LT;
+ }
+
+ @Override
+ public List<TimeRange> getTimeRanges() {
+ return startTime >= endTime
+ ? Collections.emptyList()
+ : Collections.singletonList(new TimeRange(startTime, endTime - 1));
+ }
+
+ @Override
+ public Filter reverse() {
+ return new OrFilter(new TimeLt(startTime), new TimeGtEq(endTime));
+ }
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
index 6d647b96c55..c3e1e9b3b82 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.read.filter.factory;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.filter.operator.Between;
@@ -101,6 +102,9 @@ public class FilterFactory {
case BETWEEN:
filter = new Between<>();
break;
+ case TIME_GTEQ_AND_LT:
+ filter = new TimeFilter.TimeGtEqAndLt();
+ break;
default:
throw new UnsupportedOperationException("Unknown filter type " + id);
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
index bb3c7e10608..912bc9f3360 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
@@ -34,5 +34,6 @@ public enum FilterSerializeId {
IN,
REGEXP,
LIKE,
- BETWEEN
+ BETWEEN,
+ TIME_GTEQ_AND_LT
}