This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 4db4229ce36 [To dev/1.3] Pipe: Fixed the database cache parallel 
creation logic
4db4229ce36 is described below

commit 4db4229ce36d9c098fbbf6f2e643121e0e5c7d6f
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jul 21 16:23:09 2025 +0800

    [To dev/1.3] Pipe: Fixed the database cache parallel creation logic
---
 .../configuration/PipeRuntimeEnvironment.java      |  2 +
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../PipeConfigPhysicalPlanTSStatusVisitor.java     | 16 ++----
 .../confignode/persistence/schema/ConfigMTree.java | 16 +++---
 .../metadata/DatabaseConflictException.java        | 52 ++++++++++++++++++++
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  | 11 ++++-
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |  3 +-
 ...upCacheResult.java => DatabaseCacheResult.java} |  2 +-
 .../analyze/cache/partition/PartitionCache.java    | 57 +++++++++++-----------
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |  9 +++-
 .../plugin/env/PipeTaskRuntimeEnvironment.java     |  1 +
 11 files changed, 117 insertions(+), 53 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
index 455d293dccc..8395568791c 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
@@ -24,4 +24,6 @@ public interface PipeRuntimeEnvironment {
   String getPipeName();
 
   long getCreationTime();
+
+  int getRegionId();
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 34f9872bb10..e7780a98f92 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -81,6 +81,7 @@ public enum TSStatusCode {
   SCHEMA_QUOTA_EXCEEDED(526),
   MEASUREMENT_ALREADY_EXISTS_IN_TEMPLATE(527),
   ONLY_LOGICAL_VIEW(528),
+  DATABASE_CONFLICT(529),
 
   // Storage Engine
   SYSTEM_READ_ONLY(600),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
index 73e84632596..a772ab77457 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -52,20 +52,10 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
   @Override
   public TSStatus visitCreateDatabase(final DatabaseSchemaPlan plan, final 
TSStatus context) {
     if (context.getCode() == 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
-      if (context
-          .getMessage()
-          .contains(
-              String.format(
-                  "%s has already been created as database", 
plan.getSchema().getName()))) {
-        // The same database has been created
-        return new TSStatus(
-                
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
-      }
-      // Lower or higher level database has been created
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+    } else if (context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()
+        || context.getCode() == 
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     } else if (context.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
index 54f8d8663f0..6d0d13d4c65 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
 import org.apache.iotdb.confignode.persistence.schema.mnode.IConfigMNode;
 import 
org.apache.iotdb.confignode.persistence.schema.mnode.factory.ConfigMNodeFactory;
 import org.apache.iotdb.db.exception.metadata.DatabaseAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.DatabaseConflictException;
 import org.apache.iotdb.db.exception.metadata.DatabaseNotSetException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import 
org.apache.iotdb.db.schemaengine.schemaregion.mtree.traverser.collector.DatabaseCollector;
@@ -114,7 +115,7 @@ public class ConfigMTree {
         store.addChild(cur, nodeNames[i], nodeFactory.createInternalMNode(cur, 
nodeNames[i]));
       } else if (temp.isDatabase()) {
         // before create database, check whether the database already exists
-        throw new DatabaseAlreadySetException(temp.getFullPath());
+        throw new DatabaseConflictException(temp.getFullPath(), false);
       }
       cur = store.getChild(cur, nodeNames[i]);
       i++;
@@ -128,7 +129,7 @@ public class ConfigMTree {
         if (store.getChild(cur, nodeNames[i]).isDatabase()) {
           throw new DatabaseAlreadySetException(path.getFullPath());
         } else {
-          throw new DatabaseAlreadySetException(path.getFullPath(), true);
+          throw new DatabaseConflictException(path.getFullPath(), true);
         }
       } else {
         IDatabaseMNode<IConfigMNode> databaseMNode =
@@ -137,7 +138,7 @@ public class ConfigMTree {
         IConfigMNode result = store.addChild(cur, nodeNames[i], 
databaseMNode.getAsMNode());
 
         if (result != databaseMNode) {
-          throw new DatabaseAlreadySetException(path.getFullPath(), true);
+          throw new DatabaseConflictException(path.getFullPath(), true);
         }
       }
     }
@@ -262,7 +263,7 @@ public class ConfigMTree {
         throw new DatabaseNotSetException(databasePath.getFullPath());
       }
       if (cur.isDatabase()) {
-        throw new DatabaseAlreadySetException(cur.getFullPath());
+        throw new DatabaseConflictException(cur.getFullPath(), false);
       }
     }
 
@@ -273,7 +274,7 @@ public class ConfigMTree {
     if (cur.isDatabase()) {
       return cur.getAsDatabaseMNode();
     } else {
-      throw new DatabaseAlreadySetException(databasePath.getFullPath(), true);
+      throw new DatabaseConflictException(databasePath.getFullPath(), true);
     }
   }
 
@@ -331,7 +332,8 @@ public class ConfigMTree {
    *
    * @param path a full path or a prefix path
    */
-  public void checkDatabaseAlreadySet(PartialPath path) throws 
DatabaseAlreadySetException {
+  public void checkDatabaseAlreadySet(PartialPath path)
+      throws DatabaseAlreadySetException, DatabaseConflictException {
     String[] nodeNames = path.getNodes();
     IConfigMNode cur = root;
     if (!nodeNames[0].equals(root.getName())) {
@@ -346,7 +348,7 @@ public class ConfigMTree {
         throw new DatabaseAlreadySetException(cur.getFullPath());
       }
     }
-    throw new DatabaseAlreadySetException(path.getFullPath(), true);
+    throw new DatabaseConflictException(path.getFullPath(), true);
   }
 
   // endregion
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseConflictException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseConflictException.java
new file mode 100644
index 00000000000..ee08ef4e759
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseConflictException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class DatabaseConflictException extends MetadataException {
+
+  private final boolean isChild;
+
+  private final String databasePath;
+
+  public DatabaseConflictException(final String path, final boolean isChild) {
+    super(getMessage(path, isChild), 
TSStatusCode.DATABASE_CONFLICT.getStatusCode());
+    this.isChild = isChild;
+    databasePath = path;
+  }
+
+  public boolean isChild() {
+    return isChild;
+  }
+
+  public String getDatabasePath() {
+    return databasePath;
+  }
+
+  private static String getMessage(final String path, final boolean isChild) {
+    if (isChild) {
+      return String.format("some children of %s have already been created as 
database", path);
+    } else {
+      return String.format("%s has already been created as database", path);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 58a929d30d0..82a5bb7dfbc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -109,6 +110,7 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
   private String syncConnectorVersion;
 
   private String pipeName;
+  private String databaseName;
   private IoTDBSyncClient client;
 
   private SessionPool sessionPool;
@@ -217,6 +219,11 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
     useSSL = parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false);
     trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
     trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY);
+
+    databaseName =
+        StorageEngine.getInstance()
+            .getDataRegion(new 
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()))
+            .getDatabaseName();
   }
 
   @Override
@@ -237,7 +244,7 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
               trustStorePwd);
       final TSyncIdentityInfo identityInfo =
           new TSyncIdentityInfo(
-              pipeName, System.currentTimeMillis(), syncConnectorVersion, 
IoTDBConstant.PATH_ROOT);
+              pipeName, System.currentTimeMillis(), syncConnectorVersion, 
databaseName);
       final TSStatus status = client.handshake(identityInfo);
       if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         final String errorMsg =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index a67d3e55815..cfc3bdcdd9e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -146,7 +146,8 @@ public class IoTDBLegacyPipeReceiverAgent {
                   
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
                   false);
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          && result.status.code != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+          && result.status.code != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
+          && result.status.code != 
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
         LOGGER.error(
             "Create Database error, statement: {}, result status : {}.", 
statement, result.status);
         return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/StorageGroupCacheResult.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/DatabaseCacheResult.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/StorageGroupCacheResult.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/DatabaseCacheResult.java
index 53771fa1871..52b6fd258a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/StorageGroupCacheResult.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/DatabaseCacheResult.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public abstract class StorageGroupCacheResult<V> {
+public abstract class DatabaseCacheResult<V> {
   /** the result */
   private boolean success = true;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index e649822bac4..4ca1fee18cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -138,8 +138,8 @@ public class PartitionCache {
    */
   public Map<String, List<String>> getStorageGroupToDevice(
       List<String> devicePaths, boolean tryToFetch, boolean isAutoCreate, 
String userName) {
-    StorageGroupCacheResult<List<String>> result =
-        new StorageGroupCacheResult<List<String>>() {
+    DatabaseCacheResult<List<String>> result =
+        new DatabaseCacheResult<List<String>>() {
           @Override
           public void put(String device, String storageGroupName) {
             map.computeIfAbsent(storageGroupName, k -> new ArrayList<>());
@@ -160,8 +160,8 @@ public class PartitionCache {
    */
   public Map<String, String> getDeviceToStorageGroup(
       List<String> devicePaths, boolean tryToFetch, boolean isAutoCreate, 
String userName) {
-    StorageGroupCacheResult<String> result =
-        new StorageGroupCacheResult<String>() {
+    DatabaseCacheResult<String> result =
+        new DatabaseCacheResult<String>() {
           @Override
           public void put(String device, String storageGroupName) {
             map.put(device, storageGroupName);
@@ -195,13 +195,13 @@ public class PartitionCache {
    * @param devicePaths the devices that need to hit
    */
   private void fetchStorageGroupAndUpdateCache(
-      StorageGroupCacheResult<?> result, List<String> devicePaths)
+      DatabaseCacheResult<?> result, List<String> devicePaths)
       throws ClientManagerException, TException {
     storageGroupCacheLock.writeLock().lock();
     try (ConfigNodeClient client =
         configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) 
{
       result.reset();
-      getStorageGroupMap(result, devicePaths, true);
+      getDatabaseMap(result, devicePaths, true);
       if (!result.isSuccess()) {
         TGetDatabaseReq req = new TGetDatabaseReq(ROOT_PATH, 
SchemaConstant.ALL_MATCH_SCOPE_BINARY);
         TDatabaseSchemaResp storageGroupSchemaResp = 
client.getMatchedDatabaseSchemas(req);
@@ -210,7 +210,7 @@ public class PartitionCache {
           Set<String> storageGroupNames = 
storageGroupSchemaResp.getDatabaseSchemaMap().keySet();
           // update all database into cache
           updateStorageCache(storageGroupNames);
-          getStorageGroupMap(result, devicePaths, true);
+          getDatabaseMap(result, devicePaths, true);
         }
       }
     } finally {
@@ -227,7 +227,7 @@ public class PartitionCache {
    * @throws RuntimeException if failed to create database
    */
   private void createStorageGroupAndUpdateCache(
-      StorageGroupCacheResult<?> result, List<String> devicePaths, String 
userName)
+      DatabaseCacheResult<?> result, List<String> devicePaths, String userName)
       throws ClientManagerException, MetadataException, TException {
     storageGroupCacheLock.writeLock().lock();
     try (ConfigNodeClient client =
@@ -235,25 +235,25 @@ public class PartitionCache {
       // Try to check whether database need to be created
       result.reset();
       // Try to hit database with all missed devices
-      getStorageGroupMap(result, devicePaths, false);
+      getDatabaseMap(result, devicePaths, false);
       if (!result.isSuccess()) {
         // Try to get database needed to be created from missed device
-        Set<String> storageGroupNamesNeedCreated = new HashSet<>();
+        Set<String> databaseNamesNeedCreated = new HashSet<>();
         for (String devicePath : result.getMissedDevices()) {
           if (devicePath.equals(SchemaConstant.SYSTEM_DATABASE)
               || devicePath.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")) {
-            storageGroupNamesNeedCreated.add(SchemaConstant.SYSTEM_DATABASE);
+            databaseNamesNeedCreated.add(SchemaConstant.SYSTEM_DATABASE);
           } else {
-            PartialPath storageGroupNameNeedCreated =
+            PartialPath databaseNameNeedCreated =
                 MetaUtils.getStorageGroupPathByLevel(
                     new PartialPath(devicePath), 
config.getDefaultStorageGroupLevel());
-            
storageGroupNamesNeedCreated.add(storageGroupNameNeedCreated.getFullPath());
+            
databaseNamesNeedCreated.add(databaseNameNeedCreated.getFullPath());
           }
         }
 
         // Try to create databases one by one until done or one database fail
-        Set<String> successFullyCreatedStorageGroup = new HashSet<>();
-        for (String storageGroupName : storageGroupNamesNeedCreated) {
+        Set<String> successFullyCreatedDatabases = new HashSet<>();
+        for (String databaseName : databaseNamesNeedCreated) {
           long startTime = System.nanoTime();
           try {
             if (!AuthorityChecker.SUPER_USER.equals(userName)) {
@@ -271,29 +271,30 @@ public class PartitionCache {
             
PerformanceOverviewMetrics.getInstance().recordAuthCost(System.nanoTime() - 
startTime);
           }
           TDatabaseSchema storageGroupSchema = new TDatabaseSchema();
-          storageGroupSchema.setName(storageGroupName);
-          if (SchemaConstant.SYSTEM_DATABASE.equals(storageGroupName)) {
+          storageGroupSchema.setName(databaseName);
+          if (SchemaConstant.SYSTEM_DATABASE.equals(databaseName)) {
             storageGroupSchema.setMinSchemaRegionGroupNum(1);
             storageGroupSchema.setMaxSchemaRegionGroupNum(1);
             storageGroupSchema.setMaxDataRegionGroupNum(1);
             storageGroupSchema.setMaxDataRegionGroupNum(1);
           }
           TSStatus tsStatus = client.setDatabase(storageGroupSchema);
-          if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
tsStatus.getCode()) {
-            successFullyCreatedStorageGroup.add(storageGroupName);
-          } else {
+          if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()
+              || TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() == 
tsStatus.getCode()) {
+            successFullyCreatedDatabases.add(databaseName);
+          } else if (TSStatusCode.DATABASE_CONFLICT.getStatusCode() != 
tsStatus.getCode()) {
             // Try to update cache by databases successfully created
-            updateStorageCache(successFullyCreatedStorageGroup);
+            updateStorageCache(successFullyCreatedDatabases);
             logger.warn(
                 "[{} Cache] failed to create database {}",
                 CacheMetrics.STORAGE_GROUP_CACHE_NAME,
-                storageGroupName);
+                databaseName);
             throw new RuntimeException(new IoTDBException(tsStatus.message, 
tsStatus.code));
           }
         }
         // Try to update database cache when all databases has already been 
created
-        updateStorageCache(storageGroupNamesNeedCreated);
-        getStorageGroupMap(result, devicePaths, false);
+        updateStorageCache(databaseNamesNeedCreated);
+        getDatabaseMap(result, devicePaths, false);
       }
     } finally {
       storageGroupCacheLock.writeLock().unlock();
@@ -307,8 +308,8 @@ public class PartitionCache {
    * @param devicePaths the devices that need to hit
    * @param failFast if true, return when failed. if false, return when all 
devices hit
    */
-  private void getStorageGroupMap(
-      StorageGroupCacheResult<?> result, List<String> devicePaths, boolean 
failFast) {
+  private void getDatabaseMap(
+      DatabaseCacheResult<?> result, List<String> devicePaths, boolean 
failFast) {
     storageGroupCacheLock.readLock().lock();
     try {
       // reset result before try
@@ -355,7 +356,7 @@ public class PartitionCache {
    * @param userName
    */
   private void getStorageGroupCacheResult(
-      StorageGroupCacheResult<?> result,
+      DatabaseCacheResult<?> result,
       List<String> devicePaths,
       boolean tryToFetch,
       boolean isAutoCreate,
@@ -369,7 +370,7 @@ public class PartitionCache {
       }
     }
     // first try to hit database in fast-fail way
-    getStorageGroupMap(result, devicePaths, true);
+    getDatabaseMap(result, devicePaths, true);
     if (!result.isSuccess() && tryToFetch) {
       try {
         // try to fetch database from config node when miss
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 171f2b2f87f..22990e471ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -819,7 +819,14 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
                   
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
                   false);
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          && result.status.code != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+          && result.status.code
+              != TSStatusCode.DATABASE_ALREADY_EXISTS
+                  .getStatusCode() // In tree model, if the user creates a 
conflict database
+          // concurrently, for instance, the
+          // database created by user is root.db.ss.a, the auto-creation 
failed database is root.db,
+          // we wait till "getOrCreatePartition" to judge if the time series 
(like root.db.ss.a.e /
+          // root.db.ss.a) conflicts with the created database. just do not 
throw exception here.
+          && result.status.code != 
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
         LOGGER.warn(
             "Create database error, statement: {}, result status is: {}", 
statement, result.status);
         throw new LoadFileException(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
index e14c73f2860..3e9abb67a94 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
@@ -43,6 +43,7 @@ public class PipeTaskRuntimeEnvironment implements 
PipeRuntimeEnvironment {
     return creationTime;
   }
 
+  @Override
   public int getRegionId() {
     return regionId;
   }

Reply via email to