This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9af8185 [IOTDB-2208] Reconstruct the process of generating resultset
header of query - Part2 (#4642)
9af8185 is described below
commit 9af818539d7ac57d35995af586665b9895a3bb8a
Author: Xiangwei Wei <[email protected]>
AuthorDate: Tue Dec 28 10:22:09 2021 +0800
[IOTDB-2208] Reconstruct the process of generating resultset header of
query - Part2 (#4642)
---
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 13 ++-
.../db/qp/physical/crud/AlignByDevicePlan.java | 19 ++--
.../iotdb/db/qp/physical/crud/GroupByTimePlan.java | 12 +++
.../apache/iotdb/db/qp/physical/crud/UDAFPlan.java | 13 +++
.../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 9 +-
.../db/service/thrift/impl/TSServiceImpl.java | 104 +++++++++++++++++----
6 files changed, 139 insertions(+), 31 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index fbd6605..6083605 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -32,7 +32,11 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.thrift.TException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
public class AggregationPlan extends RawDataQueryPlan {
@@ -56,11 +60,11 @@ public class AggregationPlan extends RawDataQueryPlan {
@Override
public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery)
throws TException, MetadataException {
+ TSExecuteStatementResp resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
if (isGroupByLevel()) {
List<String> respColumns = new ArrayList<>();
List<String> columnsTypes = new ArrayList<>();
- TSExecuteStatementResp resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
for (Map.Entry<String, AggregateResult> groupPathResult :
getGroupPathsResultMap().entrySet()) {
respColumns.add(groupPathResult.getKey());
@@ -68,10 +72,11 @@ public class AggregationPlan extends RawDataQueryPlan {
}
resp.setColumns(respColumns);
resp.setDataTypeList(columnsTypes);
- return resp;
} else {
- return super.getTSExecuteStatementResp(isJdbcQuery);
+ resp = super.getTSExecuteStatementResp(isJdbcQuery);
}
+ resp.setIgnoreTimeStamp(true);
+ return resp;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index e296c96..37b3a1f 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -29,7 +29,11 @@ import
org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class AlignByDevicePlan extends QueryPlan {
@@ -62,19 +66,17 @@ public class AlignByDevicePlan extends QueryPlan {
@Override
public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery)
{
+ TSExecuteStatementResp resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+
List<String> respColumns = new ArrayList<>();
List<String> columnsTypes = new ArrayList<>();
- TSExecuteStatementResp resp =
RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
-
- // set columns in TSExecuteStatementResp.
+ // the DEVICE column of ALIGN_BY_DEVICE result
respColumns.add(SQLConstant.ALIGNBY_DEVICE_COLUMN_NAME);
+ columnsTypes.add(TSDataType.TEXT.toString());
// get column types and do deduplication
- // the DEVICE column of ALIGN_BY_DEVICE result
- columnsTypes.add(TSDataType.TEXT.toString());
List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
- // the DEVICE column of ALIGN_BY_DEVICE result
deduplicatedColumnsType.add(TSDataType.TEXT);
Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
@@ -112,6 +114,9 @@ public class AlignByDevicePlan extends QueryPlan {
setPaths(null);
resp.setColumns(respColumns);
resp.setDataTypeList(columnsTypes);
+ if (getOperatorType() == OperatorType.AGGREGATION) {
+ resp.setIgnoreTimeStamp(true);
+ }
return resp;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
index cd6edc5..dd61488 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
@@ -18,7 +18,11 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+
+import org.apache.thrift.TException;
public class GroupByTimePlan extends AggregationPlan {
@@ -41,6 +45,14 @@ public class GroupByTimePlan extends AggregationPlan {
setOperatorType(Operator.OperatorType.GROUP_BY_TIME);
}
+ @Override
+ public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery)
+ throws TException, MetadataException {
+ TSExecuteStatementResp resp = super.getTSExecuteStatementResp(isJdbcQuery);
+ resp.setIgnoreTimeStamp(false);
+ return resp;
+ }
+
public long getStartTime() {
return startTime;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDAFPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDAFPlan.java
index 4f69ea6..761476b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDAFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDAFPlan.java
@@ -22,6 +22,9 @@ import
org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+
+import org.apache.thrift.TException;
import java.time.ZoneId;
import java.util.HashSet;
@@ -39,6 +42,16 @@ public class UDAFPlan extends UDTFPlan {
setOperatorType(OperatorType.UDAF);
}
+ @Override
+ public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery)
+ throws TException, MetadataException {
+ TSExecuteStatementResp resp = super.getTSExecuteStatementResp(isJdbcQuery);
+ if (getInnerAggregationPlan().getOperatorType() ==
OperatorType.AGGREGATION) {
+ resp.setIgnoreTimeStamp(true);
+ }
+ return resp;
+ }
+
public void setExpressionToInnerResultIndexMap(
Map<Expression, Integer> expressionToInnerResultIndexMap) {
expressionToInnerResultIndexMap.forEach((k, v) ->
pathNameToReaderIndex.put(k.toString(), v));
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index 999465d..37c46e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -31,7 +31,14 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import java.time.ZoneId;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 87cf042..1762ad6 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -37,8 +37,30 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.*;
-import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
+import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
+import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.tracing.TracingConstant;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
@@ -56,7 +78,47 @@ import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
+import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
+import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
+import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -73,11 +135,19 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.time.ZoneId;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
-import static org.apache.iotdb.db.utils.ErrorHandlingUtils.*;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
+import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException;
+import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
+import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.tryCatchQueryException;
/** Thrift RPC implementation at server side. */
public class TSServiceImpl extends BasicServiceProvider implements
TSIService.Iface {
@@ -592,22 +662,13 @@ public class TSServiceImpl extends BasicServiceProvider
implements TSIService.If
}
if (plan instanceof ShowPlan || plan instanceof AuthorPlan) {
- resp = getListDataSetHeaders(newDataSet);
+ resp = getListDataSetHeaders(plan, newDataSet);
} else if (plan instanceof UDFPlan
|| (plan instanceof QueryPlan && ((QueryPlan)
plan).isGroupByLevel())) {
resp = getQueryColumnHeaders(plan, username, isJdbcQuery);
}
resp.setOperationType(plan.getOperatorType().toString());
- if (plan.getOperatorType() == OperatorType.AGGREGATION
- || (plan instanceof UDAFPlan
- && ((UDAFPlan) plan).getInnerAggregationPlan().getOperatorType()
- == OperatorType.AGGREGATION)) {
- resp.setIgnoreTimeStamp(true);
- } else if (plan instanceof ShowQueryProcesslistPlan) {
- resp.setIgnoreTimeStamp(false);
- }
-
if (newDataSet instanceof DirectNonAlignDataSet) {
resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize,
newDataSet, username));
} else {
@@ -663,10 +724,15 @@ public class TSServiceImpl extends BasicServiceProvider
implements TSIService.If
}
}
- private TSExecuteStatementResp getListDataSetHeaders(QueryDataSet dataSet) {
- return StaticResps.getNoTimeExecuteResp(
-
dataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()),
-
dataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
+ private TSExecuteStatementResp getListDataSetHeaders(PhysicalPlan plan,
QueryDataSet dataSet) {
+ TSExecuteStatementResp resp =
+ StaticResps.getNoTimeExecuteResp(
+
dataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()),
+
dataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList()));
+ if (plan instanceof ShowQueryProcesslistPlan) {
+ resp.setIgnoreTimeStamp(false);
+ }
+ return resp;
}
/** get ResultSet schema */