This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 804ee600cd4 Pipe: Improved handling logic for transferred statements & 
Allow root.__system.** schemas to reach the receiver side (#12276)
804ee600cd4 is described below

commit 804ee600cd4f708114d2ddea1dae0c1b66737f6b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 2 17:58:41 2024 +0800

    Pipe: Improved handling logic for transferred statements & Allow 
root.__system.** schemas to reach the receiver side (#12276)
---
 .../extractor/ConfigRegionListeningFilter.java     | 11 -----------
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |  8 --------
 .../PipeConfigPhysicalPlanTSStatusVisitor.java     |  3 +++
 .../visitor/PipeStatementExceptionVisitor.java     |  6 +++++-
 .../visitor/PipeStatementTSStatusVisitor.java      | 23 ++++++++++++----------
 5 files changed, 21 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
index 84cebf259fb..33b62aaa754 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
@@ -21,10 +21,8 @@ package org.apache.iotdb.confignode.manager.pipe.extractor;
 
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
-import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
@@ -146,15 +144,6 @@ public class ConfigRegionListeningFilter {
       return false;
     }
 
-    // Do not transfer system database plan
-    if (type.equals(ConfigPhysicalPlanType.CreateDatabase)
-        && ((DatabaseSchemaPlan) plan)
-            .getSchema()
-            .getName()
-            .equals(SchemaConstant.SYSTEM_DATABASE)) {
-      return false;
-    }
-
     // PipeEnriched & UnsetTemplate are not listened directly,
     // but their inner plan or converted plan are listened.
     return type.equals(ConfigPhysicalPlanType.PipeEnriched)
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 864c04789ac..28e08e231a8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -27,7 +27,6 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeReques
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
-import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
@@ -181,13 +180,6 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
   private TSStatus executePlan(ConfigPhysicalPlan plan) throws 
ConsensusException {
     switch (plan.getType()) {
       case CreateDatabase:
-        if (((DatabaseSchemaPlan) plan)
-            .getSchema()
-            .getName()
-            .equals(SchemaConstant.SYSTEM_DATABASE)) {
-          // System database doesn't need transferring
-          return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-        }
         // Here we only reserve database name and substitute the sender's 
local information
         // with the receiver's default configurations
         TDatabaseSchema schema = ((DatabaseSchemaPlan) plan).getSchema();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
index 7046f3d8dc6..5a30b25e4d7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -65,6 +65,9 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
       // Lower or higher level database has been created
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
+    } else if (context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
     } else if (context.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 1ae7d106dc5..7391fa32fcc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -110,9 +111,12 @@ public class PipeStatementExceptionVisitor extends 
StatementVisitor<TSStatus, Ex
     return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
context);
   }
 
+  // InternalBatchActivateTemplateNode is converted to 
BatchActivateTemplateStatement
+  // No need to handle InternalBatchActivateTemplateStatement
+
   private TSStatus visitGeneralActivateTemplate(
       Statement activateTemplateStatement, Exception context) {
-    if (context instanceof MetadataException) {
+    if (context instanceof MetadataException || context instanceof 
StatementAnalyzeException) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 52a0c963278..117eae0a689 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -81,10 +81,6 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
 
   @Override
   public TSStatus visitCreateTimeseries(CreateTimeSeriesStatement statement, 
TSStatus context) {
-    if (context.getCode() == 
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    }
     return visitGeneralCreateTimeSeries(statement, context);
   }
 
@@ -99,7 +95,9 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
         || context.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
+    } else if (context.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
+        || context.getCode() == 
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()
+        || context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     }
@@ -125,8 +123,7 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
     return 
visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement, 
context);
   }
 
-  private TSStatus visitGeneralCreateMultiTimeseries(
-      Statement internalCreateTimeSeriesStatement, TSStatus context) {
+  private TSStatus visitGeneralCreateMultiTimeseries(Statement statement, 
TSStatus context) {
     if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
       for (TSStatus status : context.getSubStatus()) {
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
@@ -135,13 +132,16 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
             return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
                 .setMessage(context.getMessage());
           }
-          return visitStatement(internalCreateTimeSeriesStatement, context);
+          return visitStatement(statement, context);
         }
       }
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
+    } else if (context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
     }
-    return visitStatement(internalCreateTimeSeriesStatement, context);
+    return visitStatement(statement, context);
   }
 
   @Override
@@ -166,7 +166,10 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
   @Override
   public TSStatus visitCreateLogicalView(
       CreateLogicalViewStatement createLogicalViewStatement, TSStatus context) 
{
-    if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+    if (context.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    } else if (context.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
       for (TSStatus status : context.getSubStatus()) {
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
             && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {

Reply via email to