This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2cef18fb9b Fix some bugs (#5725)
2cef18fb9b is described below
commit 2cef18fb9bc84bffd6b4c91944f98b283fa0a521
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Fri Apr 29 11:55:53 2022 +0800
Fix some bugs (#5725)
---
.../iotdb/confignode/manager/ConfigManager.java | 5 ++--
.../iotdb/commons/partition/DataPartition.java | 2 +-
.../iotdb/commons/partition/SchemaPartition.java | 2 +-
.../iotdb/db/mpp/common/schematree/SchemaTree.java | 2 +-
.../scheduler/SimpleFragInstanceDispatcher.java | 3 +++
.../mpp/sql/analyze/ClusterPartitionFetcher.java | 4 ++--
.../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 28 ++++++++++++----------
.../db/mpp/sql/planner/DistributionPlanner.java | 1 +
.../planner/plan/node/process/ExchangeNode.java | 6 -----
.../service/thrift/impl/InternalServiceImpl.java | 2 +-
10 files changed, 28 insertions(+), 27 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 94ac7c68c3..c8b55e1bdc 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -198,7 +198,6 @@ public class ConfigManager implements Manager {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
List<String> devicePaths = patternTree.findAllDevicePaths();
List<String> storageGroups =
getClusterSchemaManager().getStorageGroupNames();
-
GetSchemaPartitionReq getSchemaPartitionReq = new
GetSchemaPartitionReq();
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new
HashMap<>();
@@ -207,7 +206,7 @@ public class ConfigManager implements Manager {
for (String devicePath : devicePaths) {
boolean matchStorageGroup = false;
for (String storageGroup : storageGroups) {
- if (devicePath.contains(storageGroup)) {
+ if (devicePath.startsWith(storageGroup + ".")) {
matchStorageGroup = true;
if (devicePath.contains("*")) {
// Get all SchemaPartitions of this StorageGroup if the
devicePath contains "*"
@@ -263,7 +262,7 @@ public class ConfigManager implements Manager {
if (!devicePath.contains("*")) {
// Only check devicePaths that without "*"
for (String storageGroup : storageGroups) {
- if (devicePath.contains(storageGroup)) {
+ if (devicePath.startsWith(storageGroup + ".")) {
partitionSlotsMap
.computeIfAbsent(storageGroup, key -> new ArrayList<>())
.add(getPartitionManager().getSeriesPartitionSlot(devicePath));
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 5156ba47af..8c150d0ba4 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -104,7 +104,7 @@ public class DataPartition extends Partition {
private String getStorageGroupByDevice(String deviceName) {
for (String storageGroup : dataPartitionMap.keySet()) {
- if (deviceName.startsWith(storageGroup)) {
+ if (deviceName.startsWith(storageGroup + ".")) {
return storageGroup;
}
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index a93048ff83..2c8e415199 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -63,7 +63,7 @@ public class SchemaPartition extends Partition {
private String getStorageGroupByDevice(String deviceName) {
for (String storageGroup : schemaPartitionMap.keySet()) {
- if (deviceName.startsWith(storageGroup)) {
+ if (deviceName.startsWith(storageGroup + ".")) {
return storageGroup;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 2bf2d8e72a..bc5b1c8994 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -228,7 +228,7 @@ public class SchemaTree {
*/
public String getBelongedStorageGroup(PartialPath path) {
for (String storageGroup : storageGroups) {
- if (path.getFullPath().startsWith(storageGroup)) {
+ if (path.getFullPath().startsWith(storageGroup + ".")) {
return storageGroup;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 245dbcc435..6bf7fad34a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -84,6 +84,9 @@ public class SimpleFragInstanceDispatcher implements
IFragInstanceDispatcher {
client.close();
}
throw e;
+ } catch (Exception e) {
+ LOGGER.error("unexpected exception", e);
+ throw e;
} finally {
if (client != null) {
client.returnSelf();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
index bf8ed2debb..be3ba338c7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
@@ -388,7 +388,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
for (String devicePath : devicePaths) {
boolean hit = false;
for (String storageGroup : storageGroupCache) {
- if (devicePath.startsWith(storageGroup)) {
+ if (devicePath.startsWith(storageGroup + ".")) {
deviceToStorageGroupMap.put(devicePath, storageGroup);
hit = true;
break;
@@ -514,7 +514,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
if (!device.contains("*")) {
String storageGroup = null;
for (String storageGroupName : storageGroupNames) {
- if (device.startsWith(storageGroupName)) {
+ if (device.startsWith(storageGroupName + ".")) {
storageGroup = storageGroupName;
break;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 969549998b..9fcf2a958d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -77,20 +77,24 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
if (executionResult.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException("cannot fetch schema, status is: " +
executionResult.status);
}
- TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
- // TODO: (xingtanzjr) need to release this query's resource here. This is
a temporary way
- coordinator.getQueryExecution(queryId).stopAndCleanup();
-
SchemaTree result = new SchemaTree();
- result.setStorageGroups(storageGroups);
- Binary binary;
- SchemaTree fetchedSchemaTree;
- Column column = tsBlock.getColumn(0);
- for (int i = 0; i < column.getPositionCount(); i++) {
- binary = column.getBinary(i);
- fetchedSchemaTree =
SchemaTree.deserialize(ByteBuffer.wrap(binary.getValues()));
- result.mergeSchemaTree(fetchedSchemaTree);
+ while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+ TsBlock tsBlock =
coordinator.getQueryExecution(queryId).getBatchResult();
+ if (tsBlock == null) {
+ break;
+ }
+ result.setStorageGroups(storageGroups);
+ Binary binary;
+ SchemaTree fetchedSchemaTree;
+ Column column = tsBlock.getColumn(0);
+ for (int i = 0; i < column.getPositionCount(); i++) {
+ binary = column.getBinary(i);
+ fetchedSchemaTree =
SchemaTree.deserialize(ByteBuffer.wrap(binary.getValues()));
+ result.mergeSchemaTree(fetchedSchemaTree);
+ }
}
+ // TODO: (xingtanzjr) need to release this query's resource here. This is
a temporary way
+ coordinator.getQueryExecution(queryId).stopAndCleanup();
return result;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index a026cdfc48..0e15212224 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -532,6 +532,7 @@ public class DistributionPlanner {
FragmentSinkNode sinkNode = new
FragmentSinkNode(context.getQueryId().genPlanNodeId());
sinkNode.setChild(exchangeNode.getChild());
sinkNode.setDownStreamPlanNodeId(exchangeNode.getPlanNodeId());
+
// Record the source node info in the ExchangeNode so that we can keep
the connection of
// these nodes/fragments
exchangeNode.setRemoteSourceNode(sinkNode);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 47746f0253..fadb9aed82 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -98,8 +97,6 @@ public class ExchangeNode extends PlanNode {
}
public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
- FragmentSinkNode fragmentSinkNode =
- (FragmentSinkNode) PlanFragment.deserializeHelper(byteBuffer);
TEndPoint endPoint =
new TEndPoint(
ReadWriteIOUtils.readString(byteBuffer),
ReadWriteIOUtils.readInt(byteBuffer));
@@ -108,14 +105,12 @@ public class ExchangeNode extends PlanNode {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
- exchangeNode.setRemoteSourceNode(fragmentSinkNode);
return exchangeNode;
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.EXCHANGE.serialize(byteBuffer);
- remoteSourceNode.serialize(byteBuffer);
ReadWriteIOUtils.write(upstreamEndpoint.getIp(), byteBuffer);
ReadWriteIOUtils.write(upstreamEndpoint.getPort(), byteBuffer);
upstreamInstanceId.serialize(byteBuffer);
@@ -182,7 +177,6 @@ public class ExchangeNode extends PlanNode {
}
ExchangeNode that = (ExchangeNode) o;
return Objects.equals(child, that.child)
- && Objects.equals(remoteSourceNode, that.remoteSourceNode)
&& Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
&& Objects.equals(upstreamInstanceId, that.upstreamInstanceId)
&& Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId);
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 493b8466a2..86810c8372 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -137,7 +137,7 @@ public class InternalServiceImpl implements
InternalService.Iface {
// TODO need to be implemented and currently in order not to print
NotImplementedException log,
// we simply return null
- return null;
+ return new TCancelResp(true);
// throw new NotImplementedException();
}