This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch rc/1.2.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.2.0 by this push:
     new f98b8ca7825 [To rc/1.2.0] Add executeGroupByQueryIntervalQuery rpc 
interface (#10573)
f98b8ca7825 is described below

commit f98b8ca78251bcbde93a49b31a4b6cf27e1dca0f
Author: Beyyes <[email protected]>
AuthorDate: Tue Jul 18 20:03:54 2023 +0800

    [To rc/1.2.0] Add executeGroupByQueryIntervalQuery rpc interface (#10573)
---
 .../thrift/src/main/thrift/client.thrift           |  17 ++
 pom.xml                                            |   2 +-
 .../fragment/FragmentInstanceContext.java          |   2 +-
 .../fragment/FragmentInstanceManager.java          |   4 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 224 ++++++++++++++++++++-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 191 ++++++++++++++++++
 .../iotdb/tsfile/read/filter/TimeFilter.java       |  89 ++++++++
 .../tsfile/read/filter/factory/FilterFactory.java  |   4 +
 .../read/filter/factory/FilterSerializeId.java     |   3 +-
 9 files changed, 528 insertions(+), 8 deletions(-)

diff --git a/iotdb-protocol/thrift/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift/src/main/thrift/client.thrift
index e111c8d2a35..323b955f1da 100644
--- a/iotdb-protocol/thrift/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift/src/main/thrift/client.thrift
@@ -352,6 +352,21 @@ struct TSAggregationQueryReq {
   11: optional bool legalPathNodes
 }
 
+struct TSGroupByQueryIntervalReq {
+  1: required i64 sessionId
+  2: required i64 statementId
+  3: required string device
+  4: required string measurement
+  5: required i32 dataType
+  6: required common.TAggregationType aggregationType
+  7: optional string database
+  8: optional i64 startTime
+  9: optional i64 endTime
+  10: optional i64 interval
+  11: optional i32 fetchSize
+  12: optional i64 timeout
+}
+
 struct TSCreateMultiTimeseriesReq {
   1: required i64 sessionId
   2: required list<string> paths
@@ -500,6 +515,8 @@ service IClientRPCService {
 
   TSExecuteStatementResp executeAggregationQueryV2(1:TSAggregationQueryReq 
req);
 
+  TSExecuteStatementResp 
executeGroupByQueryIntervalQuery(1:TSGroupByQueryIntervalReq req);
+
   TSFetchResultsResp fetchResultsV2(1:TSFetchResultsReq req);
 
   TSOpenSessionResp openSession(1:TSOpenSessionReq req);
diff --git a/pom.xml b/pom.xml
index 014a68b4d7d..a192948f96e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,7 +111,7 @@
         <module>iotdb-client/compile-tools</module>
         <module>iotdb-client/client-cpp</module>
         <module>metrics</module>
-        <!--        <module>integration-test</module>-->
+<!--        <module>integration-test</module>-->
         <module>consensus</module>
         <module>library-udf</module>
         <module>iotdb-api/udf-api</module>
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 6434e8faa06..37120a4c102 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -381,7 +381,7 @@ public class FragmentInstanceContext extends QueryContext {
    * All file paths used by this fragment instance must be cleared and thus 
the usage reference must
    * be decreased.
    */
-  protected synchronized void releaseResource() {
+  public synchronized void releaseResource() {
     // For schema related query FI, closedFilePaths and unClosedFilePaths will 
be null
     if (closedFilePaths != null) {
       for (TsFileResource tsFile : closedFilePaths) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 74e2985207d..997f42e2bd2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -64,12 +64,12 @@ public class FragmentInstanceManager {
   private final IDriverScheduler scheduler = DriverScheduler.getInstance();
 
   private final ScheduledExecutorService instanceManagementExecutor;
-  private final ExecutorService instanceNotificationExecutor;
+  public final ExecutorService instanceNotificationExecutor;
 
   private final Duration infoCacheTime;
 
   // record failed instances count
-  private final CounterStat failedInstances = new CounterStat();
+  public final CounterStat failedInstances = new CounterStat();
 
   private final ExecutorService intoOperationExecutor;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 8a1f9e323d9..cc931b862bc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -18,11 +18,20 @@
  */
 package org.apache.iotdb.db.service.thrift.impl;
 
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -32,8 +41,25 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.AbstractSeriesAggregationScanOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
+import 
org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -43,8 +69,14 @@ import 
org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.mpp.plan.parser.ASTVisitor;
 import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
@@ -71,6 +103,7 @@ import org.apache.iotdb.db.quotas.OperationQuota;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -99,6 +132,7 @@ import 
org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
+import org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -121,10 +155,18 @@ import 
org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
 import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import io.airlift.units.Duration;
+import io.jsonwebtoken.lang.Strings;
+import org.apache.commons.lang.StringUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,14 +175,20 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+import static 
org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
 
@@ -177,12 +225,13 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   private static final SelectResult OLD_SELECT_RESULT =
       (resp, queryExecution, fetchSize) -> {
-        Pair<TSQueryDataSet, Boolean> pair =
-            QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, 
fetchSize);
+        Pair<TSQueryDataSet, Boolean> pair = 
convertTsBlockByFetchSize(queryExecution, fetchSize);
         resp.setQueryDataSet(pair.left);
         return pair.right;
       };
 
+  public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, 
TimeUnit.MILLISECONDS);
+
   public ClientRPCServiceImpl() {
     partitionFetcher = ClusterPartitionFetcher.getInstance();
     schemaFetcher = ClusterSchemaFetcher.getInstance();
@@ -555,6 +604,175 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     }
   }
 
+  @Override
+  public TSExecuteStatementResp 
executeGroupByQueryIntervalQuery(TSGroupByQueryIntervalReq req)
+      throws TException {
+
+    try {
+      IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+
+      String database = req.getDatabase();
+      if (StringUtils.isEmpty(database)) {
+        String[] splits = Strings.split(req.getDevice(), "\\.");
+        database = String.format("%s.%s", splits[0], splits[1]);
+      }
+      String deviceId = req.getDevice();
+      String measurementId = req.getMeasurement();
+      TSDataType dataType = TSDataType.getTsDataType((byte) req.getDataType());
+
+      // only one database, one device, one time interval
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new 
HashMap<>();
+      TTimePartitionSlot timePartitionSlot =
+          TimePartitionUtils.getTimePartition(req.getStartTime());
+      DataPartitionQueryParam queryParam =
+          new DataPartitionQueryParam(
+              deviceId, Collections.singletonList(timePartitionSlot), false, 
false);
+      sgNameToQueryParamsMap.put(database, 
Collections.singletonList(queryParam));
+      DataPartition dataPartition = 
partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+      List<DataRegion> dataRegionList = new ArrayList<>();
+      List<TRegionReplicaSet> replicaSets =
+          dataPartition.getDataRegionReplicaSet(
+              deviceId, Collections.singletonList(timePartitionSlot));
+      for (TRegionReplicaSet region : replicaSets) {
+        dataRegionList.add(
+            StorageEngine.getInstance()
+                .getDataRegion(new 
DataRegionId(region.getRegionId().getId())));
+      }
+
+      List<TsBlock> blockResult =
+          executeGroupByQueryInternal(
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              deviceId,
+              measurementId,
+              dataType,
+              true,
+              req.getStartTime(),
+              req.getEndTime(),
+              req.getInterval(),
+              req.getAggregationType(),
+              dataRegionList);
+
+      String outputColumnName = req.getAggregationType().name();
+      List<ColumnHeader> columnHeaders =
+          Collections.singletonList(new ColumnHeader(outputColumnName, 
dataType));
+      DatasetHeader header = new DatasetHeader(columnHeaders, false);
+      
header.setColumnToTsBlockIndexMap(Collections.singletonList(outputColumnName));
+
+      TSExecuteStatementResp resp = createResponse(header, 1);
+      TSQueryDataSet queryDataSet = convertTsBlockByFetchSize(blockResult);
+      resp.setQueryDataSet(queryDataSet);
+
+      return resp;
+    } catch (Exception e) {
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + req + "\". " + 
OperationType.EXECUTE_AGG_QUERY));
+    } finally {
+      SESSION_MANAGER.updateIdleTime();
+    }
+  }
+
+  private List<TsBlock> executeGroupByQueryInternal(
+      SessionInfo sessionInfo,
+      String device,
+      String measurement,
+      TSDataType dataType,
+      boolean isAligned,
+      long startTime,
+      long endTme,
+      long interval,
+      TAggregationType aggregationType,
+      List<DataRegion> dataRegionList)
+      throws IllegalPathException {
+
+    int dataRegionSize = dataRegionList.size();
+    if (dataRegionSize != 1) {
+      throw new IllegalArgumentException(
+          "dataRegionList.size() should only be 1 now,  current size is " + 
dataRegionSize);
+    }
+
+    Filter timeFilter = new TimeFilter.TimeGtEqAndLt(startTime, endTme);
+
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(
+            instanceId, 
FragmentInstanceManager.getInstance().instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(
+            instanceId, stateMachine, sessionInfo, dataRegionList.get(0), 
timeFilter);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+    PlanNodeId planNodeId = new PlanNodeId("1");
+    driverContext.addOperatorContext(1, planNodeId, 
SeriesScanOperator.class.getSimpleName());
+    driverContext
+        .getOperatorContexts()
+        .forEach(operatorContext -> 
operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));
+
+    SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
+    scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
+    scanOptionsBuilder.withGlobalTimeFilter(timeFilter);
+
+    Aggregator aggregator =
+        new Aggregator(
+            AccumulatorFactory.createAccumulator(aggregationType, dataType, 
null, null, true),
+            AggregationStep.SINGLE,
+            Collections.singletonList(new InputLocation[] {new 
InputLocation(0, 0)}));
+
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(startTime, endTme, interval, interval, true);
+
+    IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, 
dataType);
+    AbstractSeriesAggregationScanOperator operator;
+    PartialPath path;
+    if (isAligned) {
+      path =
+          new AlignedPath(
+              device,
+              Collections.singletonList(measurement),
+              Collections.singletonList(measurementSchema));
+      operator =
+          new AlignedSeriesAggregationScanOperator(
+              planNodeId,
+              (AlignedPath) path,
+              Ordering.ASC,
+              scanOptionsBuilder.build(),
+              driverContext.getOperatorContexts().get(0),
+              Collections.singletonList(aggregator),
+              initTimeRangeIterator(groupByTimeParameter, true, true),
+              groupByTimeParameter,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    } else {
+      path = new MeasurementPath(device, measurement, measurementSchema);
+      operator =
+          new SeriesAggregationScanOperator(
+              planNodeId,
+              path,
+              Ordering.ASC,
+              scanOptionsBuilder.build(),
+              driverContext.getOperatorContexts().get(0),
+              Collections.singletonList(aggregator),
+              initTimeRangeIterator(groupByTimeParameter, true, true),
+              groupByTimeParameter,
+              DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+    }
+
+    try {
+      List<TsBlock> result = new ArrayList<>();
+      fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
+      
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());
+
+      while (operator.hasNext()) {
+        result.add(operator.next());
+      }
+
+      return result;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      fragmentInstanceContext.releaseResource();
+    }
+  }
+
   @Override
   public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq 
req) {
     return executeStatementV2(req);
@@ -1130,7 +1348,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
         Pair<TSQueryDataSet, Boolean> pair =
-            QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, 
req.fetchSize);
+            convertTsBlockByFetchSize(queryExecution, req.fetchSize);
         TSQueryDataSet result = pair.left;
         finished = pair.right;
         boolean hasResultSet = result.bufferForTime().limit() != 0;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 6baee727383..9aa5c0b8481 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -244,6 +244,197 @@ public class QueryDataSetUtils {
     return new Pair<>(tsQueryDataSet, finished);
   }
 
+  public static TSQueryDataSet convertTsBlockByFetchSize(List<TsBlock> 
tsBlocks)
+      throws IOException {
+    TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+
+    // one time column and each value column has an actual value buffer and a 
bitmap value to
+    // indicate whether it is a null
+    int columnNum = 1;
+    int columnNumWithTime = columnNum * 2 + 1;
+    DataOutputStream[] dataOutputStreams = new 
DataOutputStream[columnNumWithTime];
+    ByteArrayOutputStream[] byteArrayOutputStreams = new 
ByteArrayOutputStream[columnNumWithTime];
+    for (int i = 0; i < columnNumWithTime; i++) {
+      byteArrayOutputStreams[i] = new ByteArrayOutputStream();
+      dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]);
+    }
+
+    int rowCount = 0;
+    int[] valueOccupation = new int[columnNum];
+
+    // used to record a bitmap for every 8 points
+    int[] bitmaps = new int[columnNum];
+    for (TsBlock tsBlock : tsBlocks) {
+      if (tsBlock.isEmpty()) {
+        continue;
+      }
+
+      int currentCount = tsBlock.getPositionCount();
+      // serialize time column
+      for (int i = 0; i < currentCount; i++) {
+        // use columnOutput to write byte array
+        dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+      }
+
+      // serialize each value column and its bitmap
+      for (int k = 0; k < columnNum; k++) {
+        // get DataOutputStream for current value column and its bitmap
+        DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 
1)];
+
+        Column column = tsBlock.getColumn(k);
+        TSDataType type = column.getDataType();
+        switch (type) {
+          case INT32:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeInt(column.getInt(i));
+                valueOccupation[k] += 4;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case INT64:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeLong(column.getLong(i));
+                valueOccupation[k] += 8;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case FLOAT:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeFloat(column.getFloat(i));
+                valueOccupation[k] += 4;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case DOUBLE:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeDouble(column.getDouble(i));
+                valueOccupation[k] += 8;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case BOOLEAN:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                dataOutputStream.writeBoolean(column.getBoolean(i));
+                valueOccupation[k] += 1;
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          case TEXT:
+            for (int i = 0; i < currentCount; i++) {
+              rowCount++;
+              if (column.isNull(i)) {
+                bitmaps[k] = bitmaps[k] << 1;
+              } else {
+                bitmaps[k] = (bitmaps[k] << 1) | FLAG;
+                Binary binary = column.getBinary(i);
+                dataOutputStream.writeInt(binary.getLength());
+                dataOutputStream.write(binary.getValues());
+                valueOccupation[k] = valueOccupation[k] + 4 + 
binary.getLength();
+              }
+              if (rowCount != 0 && rowCount % 8 == 0) {
+                dataBitmapOutputStream.writeByte(bitmaps[k]);
+                // we should clear the bitmap every 8 points
+                bitmaps[k] = 0;
+              }
+            }
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format("Data type %s is not supported.", type));
+        }
+        if (k != columnNum - 1) {
+          rowCount -= currentCount;
+        }
+      }
+    }
+    // feed the remaining bitmap
+    int remaining = rowCount % 8;
+    for (int k = 0; k < columnNum; k++) {
+      if (remaining != 0) {
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 
1)];
+        dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
+      }
+    }
+
+    // calculate the time buffer size
+    int timeOccupation = rowCount * 8;
+    ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
+    timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
+    timeBuffer.flip();
+    tsQueryDataSet.setTime(timeBuffer);
+
+    // calculate the bitmap buffer size
+    int bitmapOccupation = (rowCount + 7) / 8;
+
+    List<ByteBuffer> bitmapList = new LinkedList<>();
+    List<ByteBuffer> valueList = new LinkedList<>();
+    for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
+      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 
2]);
+      valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
+      valueBuffer.flip();
+      valueList.add(valueBuffer);
+
+      ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
+      bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
+      bitmapBuffer.flip();
+      bitmapList.add(bitmapBuffer);
+    }
+    tsQueryDataSet.setBitmapList(bitmapList);
+    tsQueryDataSet.setValueList(valueList);
+    return tsQueryDataSet;
+  }
+
   /** pair.left is serialized TsBlock pair.right indicates if the query 
finished */
   // To fetch required amounts of data and combine them through List
   public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize(
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
index 0cbd94dc0a6..e1a028b3687 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/TimeFilter.java
@@ -18,8 +18,10 @@
  */
 package org.apache.iotdb.tsfile.read.filter;
 
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterSerializeId;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
 import org.apache.iotdb.tsfile.read.filter.operator.Between;
 import org.apache.iotdb.tsfile.read.filter.operator.Eq;
@@ -29,7 +31,11 @@ import org.apache.iotdb.tsfile.read.filter.operator.In;
 import org.apache.iotdb.tsfile.read.filter.operator.Lt;
 import org.apache.iotdb.tsfile.read.filter.operator.LtEq;
 import org.apache.iotdb.tsfile.read.filter.operator.NotEq;
+import org.apache.iotdb.tsfile.read.filter.operator.OrFilter;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -227,4 +233,87 @@ public class TimeFilter {
   public static Filter defaultTimeFilter(boolean ascending) {
     return ascending ? TimeFilter.gtEq(Long.MIN_VALUE) : 
TimeFilter.ltEq(Long.MAX_VALUE);
   }
+
+  public static class TimeGtEqAndLt implements Filter {
+
+    private long startTime;
+
+    private long endTime;
+
+    public TimeGtEqAndLt() {}
+
+    public TimeGtEqAndLt(long startTime, long endTime) {
+      this.startTime = startTime;
+      this.endTime = endTime;
+    }
+
+    @Override
+    public boolean satisfy(Statistics statistics) {
+      return !(statistics.getEndTime() < startTime || 
statistics.getStartTime() >= endTime);
+    }
+
+    @Override
+    public boolean allSatisfy(Statistics statistics) {
+      return startTime <= statistics.getStartTime() && statistics.getEndTime() 
< endTime;
+    }
+
+    @Override
+    public boolean satisfy(long time, Object value) {
+      return startTime <= time && time < endTime;
+    }
+
+    @Override
+    public boolean satisfyStartEndTime(long startTime, long endTime) {
+      return !(endTime < this.startTime || startTime >= this.endTime);
+    }
+
+    @Override
+    public boolean containStartEndTime(long startTime, long endTime) {
+      return this.startTime <= startTime && endTime < this.endTime;
+    }
+
+    @Override
+    public Filter copy() {
+      return new TimeGtEqAndLt(startTime, endTime);
+    }
+
+    @Override
+    public String toString() {
+      return "TimeGtEqAndLt{" + "startTime=" + startTime + ", endTime=" + 
endTime + '}';
+    }
+
+    @Override
+    public void serialize(DataOutputStream outputStream) {
+      try {
+        outputStream.write(getSerializeId().ordinal());
+        outputStream.writeLong(startTime);
+        outputStream.writeLong(endTime);
+      } catch (IOException ignored) {
+        // ignored
+      }
+    }
+
+    @Override
+    public void deserialize(ByteBuffer buffer) {
+      startTime = buffer.getLong();
+      endTime = buffer.getLong();
+    }
+
+    @Override
+    public FilterSerializeId getSerializeId() {
+      return FilterSerializeId.TIME_GTEQ_AND_LT;
+    }
+
+    @Override
+    public List<TimeRange> getTimeRanges() {
+      return startTime >= endTime
+          ? Collections.emptyList()
+          : Collections.singletonList(new TimeRange(startTime, endTime - 1));
+    }
+
+    @Override
+    public Filter reverse() {
+      return new OrFilter(new TimeLt(startTime), new TimeGtEq(endTime));
+    }
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
index 6d647b96c55..c3e1e9b3b82 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterFactory.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.read.filter.factory;
 
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
 import org.apache.iotdb.tsfile.read.filter.operator.Between;
@@ -101,6 +102,9 @@ public class FilterFactory {
       case BETWEEN:
         filter = new Between<>();
         break;
+      case TIME_GTEQ_AND_LT:
+        filter = new TimeFilter.TimeGtEqAndLt();
+        break;
       default:
         throw new UnsupportedOperationException("Unknown filter type " + id);
     }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
index bb3c7e10608..912bc9f3360 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterSerializeId.java
@@ -34,5 +34,6 @@ public enum FilterSerializeId {
   IN,
   REGEXP,
   LIKE,
-  BETWEEN
+  BETWEEN,
+  TIME_GTEQ_AND_LT
 }

Reply via email to