This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 07e0d21478a [opt](query) Improve the logic to calculate
TRegionReplicaSets for devices (#15102)
07e0d21478a is described below
commit 07e0d21478a7145c9aa36f1a280b4960c64680dd
Author: Beyyes <[email protected]>
AuthorDate: Mon Mar 17 17:01:47 2025 +0800
[opt](query) Improve the logic to calculate TRegionReplicaSets for devices
(#15102)
---
.../plan/relational/analyzer/Analysis.java | 17 ----
.../distribute/TableDistributedPlanGenerator.java | 102 +++++++++++++--------
.../plan/relational/analyzer/AnalyzerTest.java | 10 +-
.../iotdb/commons/utils/TimePartitionUtils.java | 7 +-
4 files changed, 73 insertions(+), 63 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index 83771367b49..317679fdcd0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
@@ -73,10 +72,8 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Streams;
import com.google.errorprone.annotations.Immutable;
-import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.type.Type;
-import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.TimeDuration;
import javax.annotation.Nullable;
@@ -105,7 +102,6 @@ import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;
import static java.util.Collections.unmodifiableSet;
import static java.util.Objects.requireNonNull;
-import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
public class Analysis implements IAnalysis {
@@ -213,10 +209,6 @@ public class Analysis implements IAnalysis {
private boolean isQuery = false;
- public DataPartition getDataPartition() {
- return dataPartition;
- }
-
public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>,
Expression> parameters) {
this.root = root;
this.parameters = ImmutableMap.copyOf(requireNonNull(parameters,
"parameters is null"));
@@ -850,15 +842,6 @@ public class Analysis implements IAnalysis {
redirectNodeList.add(endPoint);
}
- public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
- final String database, final IDeviceID deviceId, final Filter
timeFilter) {
- if (dataPartition == null) {
- return Collections.singletonList(NOT_ASSIGNED);
- } else {
- return dataPartition.getDataRegionReplicaSetWithTimeFilter(database,
deviceId, timeFilter);
- }
- }
-
public void setTableFunctionAnalysis(
TableFunctionInvocation node, TableFunctionInvocationAnalysis analysis) {
tableFunctionAnalyses.put(NodeRef.of(node), analysis);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index ef4e3936aa0..0baefe039b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -271,8 +271,7 @@ public class TableDistributedPlanGenerator
}
TopKNode newTopKNode = (TopKNode) node.clone();
- for (int i = 0; i < childrenNodes.size(); i++) {
- PlanNode child = childrenNodes.get(i);
+ for (PlanNode child : childrenNodes) {
TopKNode subTopKNode =
new TopKNode(
queryId.genPlanNodeId(),
@@ -461,15 +460,30 @@ public class TableDistributedPlanGenerator
public List<PlanNode> visitDeviceTableScan(
final DeviceTableScanNode node, final PlanContext context) {
- final Map<TRegionReplicaSet, DeviceTableScanNode> tableScanNodeMap = new
HashMap<>();
+ DataPartition dataPartition = analysis.getDataPartitionInfo();
+ if (dataPartition == null) {
+ node.setRegionReplicaSet(NOT_ASSIGNED);
+ return Collections.singletonList(node);
+ }
+
+ String dbName = node.getQualifiedObjectName().getDatabaseName();
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesSlotMap =
+ dataPartition.getDataPartitionMap().get(dbName);
+ if (seriesSlotMap == null) {
+ throw new SemanticException(
+ String.format("Given queried database: %s is not exist!", dbName));
+ }
+ final Map<TRegionReplicaSet, DeviceTableScanNode> tableScanNodeMap = new
HashMap<>();
+ Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new
HashMap<>();
for (final DeviceEntry deviceEntry : node.getDeviceEntries()) {
- final List<TRegionReplicaSet> regionReplicaSets =
- analysis.getDataRegionReplicaSetWithTimeFilter(
- node.getQualifiedObjectName().getDatabaseName(),
+ List<TRegionReplicaSet> regionReplicaSets =
+ getDeviceReplicaSets(
+ dataPartition,
+ seriesSlotMap,
deviceEntry.getDeviceID(),
- node.getTimeFilter());
-
+ node.getTimeFilter(),
+ cachedSeriesSlotWithRegions);
for (final TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
final DeviceTableScanNode deviceTableScanNode =
tableScanNodeMap.computeIfAbsent(
@@ -528,13 +542,31 @@ public class TableDistributedPlanGenerator
@Override
public List<PlanNode> visitTreeDeviceViewScan(TreeDeviceViewScanNode node,
PlanContext context) {
+ DataPartition dataPartition = analysis.getDataPartitionInfo();
+ if (dataPartition == null) {
+ node.setRegionReplicaSet(NOT_ASSIGNED);
+ return Collections.singletonList(node);
+ }
+
+ String dbName = node.getTreeDBName();
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesSlotMap =
+ dataPartition.getDataPartitionMap().get(dbName);
+ if (seriesSlotMap == null) {
+ throw new SemanticException(
+ String.format("Given queried database: %s is not exist!", dbName));
+ }
+
Map<TRegionReplicaSet, Pair<TreeAlignedDeviceViewScanNode,
TreeNonAlignedDeviceViewScanNode>>
tableScanNodeMap = new HashMap<>();
-
+ Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new
HashMap<>();
for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
List<TRegionReplicaSet> regionReplicaSets =
- analysis.getDataRegionReplicaSetWithTimeFilter(
- node.getTreeDBName(), deviceEntry.getDeviceID(),
node.getTimeFilter());
+ getDeviceReplicaSets(
+ dataPartition,
+ seriesSlotMap,
+ deviceEntry.getDeviceID(),
+ node.getTimeFilter(),
+ cachedSeriesSlotWithRegions);
for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
boolean aligned = deviceEntry instanceof AlignedDeviceEntry;
@@ -723,21 +755,21 @@ public class TableDistributedPlanGenerator
node instanceof AggregationTreeDeviceViewScanNode
? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName()
: node.getQualifiedObjectName().getDatabaseName();
- DataPartition dataPartition = analysis.getDataPartition();
+ DataPartition dataPartition = analysis.getDataPartitionInfo();
boolean needSplit = false;
List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
- if (dataPartition == null) {
- // do nothing
- } else if (!dataPartition.getDataPartitionMap().containsKey(dbName)) {
- throw new SemanticException(
- String.format("Given queried database: %s is not exist!", dbName));
- } else {
+ if (dataPartition != null) {
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesSlotMap =
dataPartition.getDataPartitionMap().get(dbName);
+ if (seriesSlotMap == null) {
+ throw new SemanticException(
+ String.format("Given queried database: %s is not exist!", dbName));
+ }
+
Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new
HashMap<>();
for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
List<TRegionReplicaSet> regionReplicaSets =
- getReplicaSetWithTimeFilter(
+ getDeviceReplicaSets(
dataPartition,
seriesSlotMap,
deviceEntry.getDeviceID(),
@@ -814,7 +846,7 @@ public class TableDistributedPlanGenerator
return resultTableScanNodeList;
}
- private List<TRegionReplicaSet> getReplicaSetWithTimeFilter(
+ private List<TRegionReplicaSet> getDeviceReplicaSets(
DataPartition dataPartition,
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesSlotMap,
IDeviceID deviceId,
@@ -823,23 +855,23 @@ public class TableDistributedPlanGenerator
// given seriesPartitionSlot has already been calculated
final TSeriesPartitionSlot seriesPartitionSlot =
dataPartition.calculateDeviceGroupId(deviceId);
- if
(cachedSeriesSlotWithRegions.containsKey(seriesPartitionSlot.getSlotId())) {
- return cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId());
- }
-
- if (!seriesSlotMap.containsKey(seriesPartitionSlot)) {
- cachedSeriesSlotWithRegions.put(
- seriesPartitionSlot.getSlotId(),
Collections.singletonList(NOT_ASSIGNED));
- return cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId());
+ List<TRegionReplicaSet> regionReplicaSets =
+ cachedSeriesSlotWithRegions.get(seriesPartitionSlot.getSlotId());
+ if (regionReplicaSets != null) {
+ return regionReplicaSets;
}
+ // given seriesPartitionSlot has not been calculated
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timeSlotMap =
seriesSlotMap.get(seriesPartitionSlot);
+ if (timeSlotMap == null) {
+ List<TRegionReplicaSet> cachedReplicaSets =
Collections.singletonList(NOT_ASSIGNED);
+ cachedSeriesSlotWithRegions.put(seriesPartitionSlot.getSlotId(),
cachedReplicaSets);
+ return cachedReplicaSets;
+ }
if (timeSlotMap.size() == 1) {
TTimePartitionSlot timePartitionSlot =
timeSlotMap.keySet().iterator().next();
- if (timeFilter == null
- || TimePartitionUtils.satisfyPartitionStartTime(
- timeFilter, timePartitionSlot.startTime)) {
+ if (TimePartitionUtils.satisfyPartitionStartTime(timeFilter,
timePartitionSlot.startTime)) {
cachedSeriesSlotWithRegions.put(
seriesPartitionSlot.getSlotId(),
timeSlotMap.values().iterator().next());
return timeSlotMap.values().iterator().next();
@@ -859,14 +891,6 @@ public class TableDistributedPlanGenerator
List<TRegionReplicaSet> resultList = new ArrayList<>(resultSet);
cachedSeriesSlotWithRegions.put(seriesPartitionSlot.getSlotId(),
resultList);
return resultList;
- // return seriesSlotMap.get(seriesPartitionSlot).entrySet().stream()
- // .filter(
- // entry ->
- // TimePartitionUtils.satisfyPartitionStartTime(timeFilter,
- // entry.getKey().startTime))
- // .flatMap(entry -> entry.getValue().stream())
- // .distinct()
- // .collect(toList());
}
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 07672ab95d0..3f1df86076a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -1125,11 +1125,11 @@ public class AnalyzerTest {
context,
new SqlParser(),
sessionInfo);
- assertEquals(1, analysis.getDataPartition().getDataPartitionMap().size());
+ assertEquals(1,
analysis.getDataPartitionInfo().getDataPartitionMap().size());
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
partitionSlotMapMap =
analysis
- .getDataPartition()
+ .getDataPartitionInfo()
.getDataPartitionMap()
.get(sessionInfo.getDatabaseName().orElse(null));
assertEquals(3, partitionSlotMapMap.size());
@@ -1178,12 +1178,12 @@ public class AnalyzerTest {
context,
new SqlParser(),
sessionInfo);
- assertEquals(1, analysis.getDataPartition().getDataPartitionMap().size());
- assertEquals(1, analysis.getDataPartition().getDataPartitionMap().size());
+ assertEquals(1,
analysis.getDataPartitionInfo().getDataPartitionMap().size());
+ assertEquals(1,
analysis.getDataPartitionInfo().getDataPartitionMap().size());
final Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>
partitionSlotMapMap =
analysis
- .getDataPartition()
+ .getDataPartitionInfo()
.getDataPartitionMap()
.get(sessionInfo.getDatabaseName().orElse(null));
assertEquals(1, partitionSlotMapMap.size());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
index ed01e8a5f3f..058f7433929 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java
@@ -148,12 +148,15 @@ public class TimePartitionUtils {
}
public static boolean satisfyPartitionStartTime(Filter timeFilter, long
partitionStartTime) {
+ if (timeFilter == null) {
+ return true;
+ }
+
long partitionEndTime =
partitionStartTime >= timePartitionLowerBoundWithoutOverflow
? Long.MAX_VALUE
: (partitionStartTime + timePartitionInterval - 1);
- return timeFilter == null
- || timeFilter.satisfyStartEndTime(partitionStartTime,
partitionEndTime);
+ return timeFilter.satisfyStartEndTime(partitionStartTime,
partitionEndTime);
}
public static boolean satisfyTimePartition(Filter timeFilter, long
partitionId) {