This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cef18fb9b Fix some bugs (#5725)
2cef18fb9b is described below

commit 2cef18fb9bc84bffd6b4c91944f98b283fa0a521
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Fri Apr 29 11:55:53 2022 +0800

    Fix some bugs (#5725)
---
 .../iotdb/confignode/manager/ConfigManager.java    |  5 ++--
 .../iotdb/commons/partition/DataPartition.java     |  2 +-
 .../iotdb/commons/partition/SchemaPartition.java   |  2 +-
 .../iotdb/db/mpp/common/schematree/SchemaTree.java |  2 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |  3 +++
 .../mpp/sql/analyze/ClusterPartitionFetcher.java   |  4 ++--
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   | 28 ++++++++++++----------
 .../db/mpp/sql/planner/DistributionPlanner.java    |  1 +
 .../planner/plan/node/process/ExchangeNode.java    |  6 -----
 .../service/thrift/impl/InternalServiceImpl.java   |  2 +-
 10 files changed, 28 insertions(+), 27 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 94ac7c68c3..c8b55e1bdc 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -198,7 +198,6 @@ public class ConfigManager implements Manager {
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       List<String> devicePaths = patternTree.findAllDevicePaths();
       List<String> storageGroups = 
getClusterSchemaManager().getStorageGroupNames();
-
       GetSchemaPartitionReq getSchemaPartitionReq = new 
GetSchemaPartitionReq();
       Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new 
HashMap<>();
 
@@ -207,7 +206,7 @@ public class ConfigManager implements Manager {
       for (String devicePath : devicePaths) {
         boolean matchStorageGroup = false;
         for (String storageGroup : storageGroups) {
-          if (devicePath.contains(storageGroup)) {
+          if (devicePath.startsWith(storageGroup + ".")) {
             matchStorageGroup = true;
             if (devicePath.contains("*")) {
               // Get all SchemaPartitions of this StorageGroup if the 
devicePath contains "*"
@@ -263,7 +262,7 @@ public class ConfigManager implements Manager {
         if (!devicePath.contains("*")) {
           // Only check devicePaths that without "*"
           for (String storageGroup : storageGroups) {
-            if (devicePath.contains(storageGroup)) {
+            if (devicePath.startsWith(storageGroup + ".")) {
               partitionSlotsMap
                   .computeIfAbsent(storageGroup, key -> new ArrayList<>())
                   
.add(getPartitionManager().getSeriesPartitionSlot(devicePath));
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 5156ba47af..8c150d0ba4 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -104,7 +104,7 @@ public class DataPartition extends Partition {
 
   private String getStorageGroupByDevice(String deviceName) {
     for (String storageGroup : dataPartitionMap.keySet()) {
-      if (deviceName.startsWith(storageGroup)) {
+      if (deviceName.startsWith(storageGroup + ".")) {
         return storageGroup;
       }
     }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index a93048ff83..2c8e415199 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -63,7 +63,7 @@ public class SchemaPartition extends Partition {
 
   private String getStorageGroupByDevice(String deviceName) {
     for (String storageGroup : schemaPartitionMap.keySet()) {
-      if (deviceName.startsWith(storageGroup)) {
+      if (deviceName.startsWith(storageGroup + ".")) {
         return storageGroup;
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 2bf2d8e72a..bc5b1c8994 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -228,7 +228,7 @@ public class SchemaTree {
    */
   public String getBelongedStorageGroup(PartialPath path) {
     for (String storageGroup : storageGroups) {
-      if (path.getFullPath().startsWith(storageGroup)) {
+      if (path.getFullPath().startsWith(storageGroup + ".")) {
         return storageGroup;
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index 245dbcc435..6bf7fad34a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -84,6 +84,9 @@ public class SimpleFragInstanceDispatcher implements 
IFragInstanceDispatcher {
                 client.close();
               }
               throw e;
+            } catch (Exception e) {
+              LOGGER.error("unexpected exception", e);
+              throw e;
             } finally {
               if (client != null) {
                 client.returnSelf();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
index bf8ed2debb..be3ba338c7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
@@ -388,7 +388,7 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
       for (String devicePath : devicePaths) {
         boolean hit = false;
         for (String storageGroup : storageGroupCache) {
-          if (devicePath.startsWith(storageGroup)) {
+          if (devicePath.startsWith(storageGroup + ".")) {
             deviceToStorageGroupMap.put(devicePath, storageGroup);
             hit = true;
             break;
@@ -514,7 +514,7 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
         if (!device.contains("*")) {
           String storageGroup = null;
           for (String storageGroupName : storageGroupNames) {
-            if (device.startsWith(storageGroupName)) {
+            if (device.startsWith(storageGroupName + ".")) {
               storageGroup = storageGroupName;
               break;
             }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 969549998b..9fcf2a958d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -77,20 +77,24 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
     if (executionResult.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new RuntimeException("cannot fetch schema, status is: " + 
executionResult.status);
     }
-    TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
-    // TODO: (xingtanzjr) need to release this query's resource here. This is 
a temporary way
-    coordinator.getQueryExecution(queryId).stopAndCleanup();
-
     SchemaTree result = new SchemaTree();
-    result.setStorageGroups(storageGroups);
-    Binary binary;
-    SchemaTree fetchedSchemaTree;
-    Column column = tsBlock.getColumn(0);
-    for (int i = 0; i < column.getPositionCount(); i++) {
-      binary = column.getBinary(i);
-      fetchedSchemaTree = 
SchemaTree.deserialize(ByteBuffer.wrap(binary.getValues()));
-      result.mergeSchemaTree(fetchedSchemaTree);
+    while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+      TsBlock tsBlock = 
coordinator.getQueryExecution(queryId).getBatchResult();
+      if (tsBlock == null) {
+        break;
+      }
+      result.setStorageGroups(storageGroups);
+      Binary binary;
+      SchemaTree fetchedSchemaTree;
+      Column column = tsBlock.getColumn(0);
+      for (int i = 0; i < column.getPositionCount(); i++) {
+        binary = column.getBinary(i);
+        fetchedSchemaTree = 
SchemaTree.deserialize(ByteBuffer.wrap(binary.getValues()));
+        result.mergeSchemaTree(fetchedSchemaTree);
+      }
     }
+    // TODO: (xingtanzjr) need to release this query's resource here. This is 
a temporary way
+    coordinator.getQueryExecution(queryId).stopAndCleanup();
     return result;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index a026cdfc48..0e15212224 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -532,6 +532,7 @@ public class DistributionPlanner {
         FragmentSinkNode sinkNode = new 
FragmentSinkNode(context.getQueryId().genPlanNodeId());
         sinkNode.setChild(exchangeNode.getChild());
         sinkNode.setDownStreamPlanNodeId(exchangeNode.getPlanNodeId());
+
         // Record the source node info in the ExchangeNode so that we can keep 
the connection of
         // these nodes/fragments
         exchangeNode.setRemoteSourceNode(sinkNode);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 47746f0253..fadb9aed82 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -98,8 +97,6 @@ public class ExchangeNode extends PlanNode {
   }
 
   public static ExchangeNode deserialize(ByteBuffer byteBuffer) {
-    FragmentSinkNode fragmentSinkNode =
-        (FragmentSinkNode) PlanFragment.deserializeHelper(byteBuffer);
     TEndPoint endPoint =
         new TEndPoint(
             ReadWriteIOUtils.readString(byteBuffer), 
ReadWriteIOUtils.readInt(byteBuffer));
@@ -108,14 +105,12 @@ public class ExchangeNode extends PlanNode {
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     ExchangeNode exchangeNode = new ExchangeNode(planNodeId);
     exchangeNode.setUpstream(endPoint, fragmentInstanceId, upstreamPlanNodeId);
-    exchangeNode.setRemoteSourceNode(fragmentSinkNode);
     return exchangeNode;
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.EXCHANGE.serialize(byteBuffer);
-    remoteSourceNode.serialize(byteBuffer);
     ReadWriteIOUtils.write(upstreamEndpoint.getIp(), byteBuffer);
     ReadWriteIOUtils.write(upstreamEndpoint.getPort(), byteBuffer);
     upstreamInstanceId.serialize(byteBuffer);
@@ -182,7 +177,6 @@ public class ExchangeNode extends PlanNode {
     }
     ExchangeNode that = (ExchangeNode) o;
     return Objects.equals(child, that.child)
-        && Objects.equals(remoteSourceNode, that.remoteSourceNode)
         && Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
         && Objects.equals(upstreamInstanceId, that.upstreamInstanceId)
         && Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 493b8466a2..86810c8372 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -137,7 +137,7 @@ public class InternalServiceImpl implements 
InternalService.Iface {
 
     // TODO need to be implemented and currently in order not to print 
NotImplementedException log,
     // we simply return null
-    return null;
+    return new TCancelResp(true);
     //    throw new NotImplementedException();
   }
 

Reply via email to