This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ddb9c7cf0a9 [To dev/1.3] Fix pipe schema snapshot database creation 
(#17944)
ddb9c7cf0a9 is described below

commit ddb9c7cf0a9d8f51e9f2a6cdb98dc3ea31626b6b
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 11:01:34 2026 +0800

    [To dev/1.3] Fix pipe schema snapshot database creation (#17944)
    
    * Fix pipe schema snapshot database creation (#17910)
    
    * Fix pipe schema snapshot database creation
    
    * Fix legacy pipe receiver database conflict handling
    
    * Fix pipe enriched config statement execution
    
    (cherry picked from commit 98c823461be83181e454d15152d9c67e61c7b06d)
    
    * Skip unrelated pipe schema snapshot databases
---
 .../pipe/sink/PipeConfigNodeThriftRequestTest.java | 18 ++++++++
 ...eConfigPhysicalPlanPatternParseVisitorTest.java | 42 +++++++++++++++++
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |  3 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 54 ++++++++++++++++++++--
 .../iotdb/db/queryengine/plan/Coordinator.java     | 15 ++++--
 .../protocol/thrift/IoTDBDataNodeReceiverTest.java |  9 ++++
 6 files changed, 131 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
index f014a8ae22a..f1d17d8a0f0 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigP
 import 
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotPieceReq;
 import 
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotSealReq;
 import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -97,4 +98,21 @@ public class PipeConfigNodeThriftRequestTest {
     Assert.assertEquals(req.getFileLengths(), deserializeReq.getFileLengths());
     Assert.assertEquals(req.getParameters(), deserializeReq.getParameters());
   }
+
+  @Test
+  public void testPipeTransferConfigSnapshotSealReqPreservesPathPattern() 
throws IOException {
+    String snapshotName = "cluster_schema.bin";
+    String templateInfoName = "template_info.bin";
+    CNSnapshotFileType fileType = CNSnapshotFileType.SCHEMA;
+    String typeString = "200";
+
+    PipeTransferConfigSnapshotSealReq req =
+        PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
+            "root.ln.**", snapshotName, 100, templateInfoName, 10, fileType, 
typeString);
+    PipeTransferConfigSnapshotSealReq deserializeReq =
+        PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(
+        "root.ln.**", 
deserializeReq.getParameters().get(ColumnHeaderConstant.PATH_PATTERN));
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
index 8125a980acd..a246ea9de90 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
@@ -91,6 +91,28 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest {
             .orElseThrow(AssertionError::new));
   }
 
+  @Test
+  public void testCreateDatabaseWithProcess() {
+    final DatabaseSchemaPlan includedCreateDatabasePlan =
+        new DatabaseSchemaPlan(
+            ConfigPhysicalPlanType.CreateDatabase, new 
TDatabaseSchema("root.ln"));
+    final DatabaseSchemaPlan excludedCreateDatabasePlan =
+        new DatabaseSchemaPlan(
+            ConfigPhysicalPlanType.CreateDatabase, new 
TDatabaseSchema("root.db"));
+    final IoTDBPipePatternOperations rootLnPattern =
+        new UnionIoTDBPipePattern(new IoTDBPipePattern("root.ln.**"));
+
+    Assert.assertEquals(
+        includedCreateDatabasePlan,
+        IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+            .process(includedCreateDatabasePlan, rootLnPattern)
+            .orElseThrow(AssertionError::new));
+    Assert.assertFalse(
+        IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+            .process(excludedCreateDatabasePlan, rootLnPattern)
+            .isPresent());
+  }
+
   @Test
   public void testAlterDatabase() {
     final DatabaseSchemaPlan alterDatabasePlan =
@@ -201,6 +223,26 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest 
{
             .orElseThrow(AssertionError::new));
   }
 
+  @Test
+  public void testCommitSetSchemaTemplateWithRootLnPattern() {
+    final IoTDBPipePatternOperations rootLnPattern =
+        new UnionIoTDBPipePattern(new IoTDBPipePattern("root.ln.**"));
+    final CommitSetSchemaTemplatePlan includedSetTemplatePlan =
+        new CommitSetSchemaTemplatePlan("t1", "root.ln.wf01");
+    final CommitSetSchemaTemplatePlan excludedSetTemplatePlan =
+        new CommitSetSchemaTemplatePlan("t1", "root.db.wf01");
+
+    Assert.assertEquals(
+        includedSetTemplatePlan,
+        IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+            .process(includedSetTemplatePlan, rootLnPattern)
+            .orElseThrow(AssertionError::new));
+    Assert.assertFalse(
+        IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+            .process(excludedSetTemplatePlan, rootLnPattern)
+            .isPresent());
+  }
+
   @Test
   public void testPipeUnsetSchemaTemplate() {
     final PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlanOnPrefix =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index cc85e2f4f10..b9e26ee0fa5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -160,8 +160,7 @@ public class IoTDBLegacyPipeReceiverAgent {
                   
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
                   false);
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          && result.status.code != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
-          && result.status.code != 
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+          && result.status.code != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
         LOGGER.error(
             "Create Database error, statement: {}, result status : {}.", 
statement, result.status);
         return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 8a5c4ad060f..4c1ed974811 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -83,6 +83,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsSta
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
@@ -536,17 +537,29 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private TSStatus loadSchemaSnapShot(
       final Map<String, String> parameters, final List<String> 
fileAbsolutePaths)
       throws IllegalPathException, IOException {
+    final String databaseName = parameters.get(ColumnHeaderConstant.DATABASE);
+    final PartialPath databasePath = new PartialPath(databaseName);
+
+    final String pathPattern = 
parameters.get(ColumnHeaderConstant.PATH_PATTERN);
+    if (!shouldLoadSchemaSnapshotDatabase(pathPattern, databaseName)) {
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    }
+    final PipePattern pipePattern =
+        PipePattern.parsePatternFromString(pathPattern, IoTDBPipePattern::new);
+
+    final TSStatus createDatabaseStatus = 
createSchemaSnapshotDatabaseIfNecessary(databasePath);
+    if (createDatabaseStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return createDatabaseStatus;
+    }
+
     final SRStatementGenerator generator =
         SchemaRegionSnapshotParser.translate2Statements(
             Paths.get(fileAbsolutePaths.get(0)),
             fileAbsolutePaths.size() > 1 ? Paths.get(fileAbsolutePaths.get(1)) 
: null,
-            new PartialPath(parameters.get(ColumnHeaderConstant.DATABASE)));
+            databasePath);
     final Set<StatementType> executionTypes =
         PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
             parameters.get(ColumnHeaderConstant.TYPE));
-    final PipePattern pipePattern =
-        PipePattern.parsePatternFromString(
-            parameters.get(ColumnHeaderConstant.PATH_PATTERN), 
IoTDBPipePattern::new);
 
     // Clear to avoid previous exceptions
     batchVisitor.clear();
@@ -571,6 +584,39 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return PipeReceiverStatusHandler.getPriorStatus(results);
   }
 
+  static boolean shouldLoadSchemaSnapshotDatabase(
+      final String pathPattern, final String databaseName) {
+    return PipePattern.parsePatternFromString(pathPattern, 
IoTDBPipePattern::new)
+        .mayOverlapWithDb(databaseName);
+  }
+
+  private TSStatus createSchemaSnapshotDatabaseIfNecessary(final PartialPath 
databasePath) {
+    final DatabaseSchemaStatement statement =
+        new 
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
+    statement.setDatabasePath(databasePath);
+
+    final TSStatus status = executeStatementAndClassifyExceptions(statement);
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return status;
+    }
+
+    if (status.getCode() == 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+      return Objects.equals(
+              status.getMessage(),
+              databasePath.getFullPath() + " has already been created as 
database")
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+          : new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+              .setMessage(status.getMessage());
+    }
+
+    if (status.getCode() == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(status.getMessage());
+    }
+
+    return status;
+  }
+
   private TPipeTransferResp handleTransferSchemaPlan(final 
PipeTransferPlanNodeReq req) {
     // We may be able to skip the alter logical view's exception parsing 
because
     // the "AlterLogicalViewNode" is itself idempotent
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 2f173861f22..2a548557d8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner;
 import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.utils.SetThreadName;
 
 import org.slf4j.Logger;
@@ -208,13 +209,19 @@ public class Coordinator {
       long startTime) {
     queryContext.setTimeOut(timeOut);
     queryContext.setStartTime(startTime);
-    if (statement instanceof IConfigStatement) {
-      queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
+    final Statement configStatement =
+        statement instanceof PipeEnrichedStatement
+                && ((PipeEnrichedStatement) statement).getInnerStatement()
+                    instanceof IConfigStatement
+            ? ((PipeEnrichedStatement) statement).getInnerStatement()
+            : statement;
+    if (configStatement instanceof IConfigStatement) {
+      queryContext.setQueryType(((IConfigStatement) 
configStatement).getQueryType());
       return new ConfigExecution(
           queryContext,
-          statement.getType(),
+          configStatement.getType(),
           executor,
-          statement.accept(new ConfigTaskVisitor(), queryContext));
+          configStatement.accept(new ConfigTaskVisitor(), queryContext));
     }
     TreeModelPlanner treeModelPlanner =
         new TreeModelPlanner(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
index 1e279a18feb..9eac46c8351 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -110,6 +110,15 @@ public class IoTDBDataNodeReceiverTest {
     }
   }
 
+  @Test
+  public void testSchemaSnapshotDatabaseIsFilteredByPattern() {
+    Assert.assertTrue(
+        IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.ln.**", 
"root.ln"));
+    
Assert.assertTrue(IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.**",
 "root.db"));
+    Assert.assertFalse(
+        IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.ln.**", 
"root.db"));
+  }
+
   @Test
   public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType() 
throws Exception {
     final Path tsFile = 
Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile");

Reply via email to