This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cef068e674 Pipe: Fixed the NPE for last table node in snapshot parser 
& Added judging logic when table is also device in snapshot parser & Refactor & 
Fixed the missing trimming logic for tsFile & Enhanced the table idempotency 
logic & Fixed the ttl + table database privilege logic at receiver side & Fixed 
the bug that the table write auto-create-db does not check privilege (#15135)
7cef068e674 is described below

commit 7cef068e674b01a8162093afc9b1213b30af0483
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 21 18:58:25 2025 +0800

    Pipe: Fixed the NPE for last table node in snapshot parser & Added judging 
logic when table is also device in snapshot parser & Refactor & Fixed the 
missing trimming logic for tsFile & Enhanced the table idempotency logic & 
Fixed the ttl + table database privilege logic at receiver side & Fixed the bug 
that the table write auto-create-db does not check privilege (#15135)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../receiver/protocol/IoTDBConfigNodeReceiver.java | 61 ++++++++++++++++++----
 .../PipeConfigPhysicalPlanTSStatusVisitor.java     | 20 +++----
 .../agent/task/connection/PipeEventCollector.java  | 15 ++++--
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  5 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  1 +
 .../db/tools/schema/SRStatementGenerator.java      |  8 ++-
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |  2 +-
 7 files changed, 77 insertions(+), 35 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index b1a3dfac009..8a86d679b14 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.auth.entity.PrivilegeUnion;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
@@ -264,11 +265,37 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
   private TSStatus checkPermission(final ConfigPhysicalPlan plan) {
     switch (plan.getType()) {
       case CreateDatabase:
+        return PathUtils.isTableModelDatabase(((DatabaseSchemaPlan) 
plan).getSchema().getName())
+            ? configManager
+                .checkUserPrivileges(
+                    username,
+                    new PrivilegeUnion(
+                        ((DatabaseSchemaPlan) plan).getSchema().getName(), 
PrivilegeType.CREATE))
+                .getStatus()
+            : configManager
+                .checkUserPrivileges(username, new 
PrivilegeUnion(PrivilegeType.MANAGE_DATABASE))
+                .getStatus();
       case AlterDatabase:
+        return PathUtils.isTableModelDatabase(((DatabaseSchemaPlan) 
plan).getSchema().getName())
+            ? configManager
+                .checkUserPrivileges(
+                    username,
+                    new PrivilegeUnion(
+                        ((DatabaseSchemaPlan) plan).getSchema().getName(), 
PrivilegeType.ALTER))
+                .getStatus()
+            : configManager
+                .checkUserPrivileges(username, new 
PrivilegeUnion(PrivilegeType.MANAGE_DATABASE))
+                .getStatus();
       case DeleteDatabase:
-        return configManager
-            .checkUserPrivileges(username, new 
PrivilegeUnion(PrivilegeType.MANAGE_DATABASE))
-            .getStatus();
+        return PathUtils.isTableModelDatabase(((DeleteDatabasePlan) 
plan).getName())
+            ? configManager
+                .checkUserPrivileges(
+                    username,
+                    new PrivilegeUnion(((DeleteDatabasePlan) plan).getName(), 
PrivilegeType.DROP))
+                .getStatus()
+            : configManager
+                .checkUserPrivileges(username, new 
PrivilegeUnion(PrivilegeType.MANAGE_DATABASE))
+                .getStatus();
       case ExtendSchemaTemplate:
         return configManager
             .checkUserPrivileges(username, new 
PrivilegeUnion(PrivilegeType.EXTEND_TEMPLATE))
@@ -312,14 +339,26 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
                     PrivilegeType.WRITE_SCHEMA))
             .getStatus();
       case SetTTL:
-        return configManager
-            .checkUserPrivileges(
-                username,
-                new PrivilegeUnion(
-                    Collections.singletonList(
-                        new PartialPath(((SetTTLPlan) plan).getPathPattern())),
-                    PrivilegeType.WRITE_SCHEMA))
-            .getStatus();
+        return Objects.equals(
+                configManager
+                    .getTTLManager()
+                    .getAllTTL()
+                    .get(
+                        String.join(
+                            String.valueOf(IoTDBConstant.PATH_SEPARATOR),
+                            ((SetTTLPlan) plan).getPathPattern())),
+                ((SetTTLPlan) plan).getTTL())
+            ? StatusUtils.OK
+            : configManager
+                .checkUserPrivileges(
+                    username,
+                    ((SetTTLPlan) plan).isDataBase()
+                        ? new PrivilegeUnion(PrivilegeType.MANAGE_DATABASE)
+                        : new PrivilegeUnion(
+                            Collections.singletonList(
+                                new PartialPath(((SetTTLPlan) 
plan).getPathPattern())),
+                            PrivilegeType.WRITE_SCHEMA))
+                .getStatus();
       case UpdateTriggerStateInTable:
       case DeleteTriggerInTable:
         return configManager
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
index faa8789ba67..adffa6b6778 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -535,13 +535,13 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
   @Override
   public TSStatus visitCommitDeleteColumn(
       final CommitDeleteColumnPlan commitDeleteColumnPlan, final TSStatus 
context) {
-    return visitCommonTableColumnPlan(commitDeleteColumnPlan, context);
+    return visitCommonTablePlan(commitDeleteColumnPlan, context);
   }
 
   @Override
   public TSStatus visitRenameTableColumn(
       final RenameTableColumnPlan renameTableColumnPlan, final TSStatus 
context) {
-    return visitCommonTableColumnPlan(renameTableColumnPlan, context);
+    return visitCommonTablePlan(renameTableColumnPlan, context);
   }
 
   @Override
@@ -565,22 +565,14 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
   @Override
   public TSStatus visitSetTableColumnComment(
       final SetTableColumnCommentPlan setTableColumnCommentPlan, final 
TSStatus context) {
-    return visitCommonTableColumnPlan(setTableColumnCommentPlan, context);
-  }
-
-  private TSStatus visitCommonTableColumnPlan(
-      final ConfigPhysicalPlan plan, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.COLUMN_ALREADY_EXISTS.getStatusCode()
-        || context.getCode() == 
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode()) {
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    }
-    return visitCommonTablePlan(plan, context);
+    return visitCommonTablePlan(setTableColumnCommentPlan, context);
   }
 
   private TSStatus visitCommonTablePlan(final ConfigPhysicalPlan plan, final 
TSStatus context) {
     if (context.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()
-        || context.getCode() == TSStatusCode.TABLE_NOT_EXISTS.getStatusCode()) 
{
+        || context.getCode() == TSStatusCode.TABLE_NOT_EXISTS.getStatusCode()
+        || context.getCode() == 
TSStatusCode.COLUMN_ALREADY_EXISTS.getStatusCode()
+        || context.getCode() == 
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 51221001a58..6e265711ad0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -136,11 +136,8 @@ public class PipeEventCollector implements EventCollector {
     }
 
     if (!forceTabletFormat
-        && (!sourceEvent.shouldParseTimeOrPattern()
-            || (sourceEvent.isTableModelEvent()
-                && (sourceEvent.getTablePattern() == null
-                    || !sourceEvent.getTablePattern().hasTablePattern())
-                && !sourceEvent.shouldParseTime()))) {
+        && !sourceEvent.shouldParse4Privilege()
+        && canSkipParsing4TsFileEvent(sourceEvent)) {
       collectEvent(sourceEvent);
       return;
     }
@@ -154,6 +151,14 @@ public class PipeEventCollector implements EventCollector {
     }
   }
 
+  private boolean canSkipParsing4TsFileEvent(final PipeTsFileInsertionEvent 
sourceEvent) {
+    return !sourceEvent.shouldParseTimeOrPattern()
+        || (sourceEvent.isTableModelEvent()
+            && (sourceEvent.getTablePattern() == null
+                || !sourceEvent.getTablePattern().hasTablePattern())
+            && !sourceEvent.shouldParseTime());
+  }
+
   private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent 
parsedEvent) {
     if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
       hasNoGeneratedEvent = false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 19a4913429f..357e11e2f39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -390,9 +390,8 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
     }
   }
 
-  @Override
-  public boolean shouldParsePattern() {
-    return super.shouldParsePattern() || shouldParse4Privilege;
+  public boolean shouldParse4Privilege() {
+    return shouldParse4Privilege;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 66e95adeebb..5f588e0d207 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -987,6 +987,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       return;
     }
 
+    
Coordinator.getInstance().getAccessControl().checkCanCreateDatabase(username, 
database);
     final TDatabaseSchema schema = new TDatabaseSchema(new 
TDatabaseSchema(database));
     schema.setIsTableModel(true);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
index d2dd6512ca7..d3e24d6eb6d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
@@ -183,7 +183,9 @@ public class SRStatementGenerator implements 
Iterator<Object>, Iterable<Object>
               genAlignedTimeseriesStatement(
                   // skip common database
                   node, databaseFullPath.concatPath(node.getPartialPath(), 1));
-          statements.push(stmt);
+          if (Objects.nonNull(stmt)) {
+            statements.push(stmt);
+          }
         }
         cleanMTreeNode(node);
         if (ancestors.isEmpty()) {
@@ -308,6 +310,10 @@ public class SRStatementGenerator implements 
Iterator<Object>, Iterable<Object>
       case TABLE_MNODE_TYPE:
         childrenNum = ReadWriteIOUtils.readInt(inputStream);
         node = deserializer.deserializeTableDeviceMNode(inputStream);
+        if (ancestors.size() == 1) {
+          emitDevice();
+          this.tableName = node.getName();
+        }
         break;
       default:
         throw new IOException("Unrecognized MNode type" + type);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 1e0f6305f2d..069f4a6a15d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -648,7 +648,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
       return new TPipeTransferResp(
           RpcUtils.getStatus(
               TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
-              String.format("Failed to seal file %s because %s", writingFile, 
e.getMessage())));
+              String.format("Failed to seal file %s because %s", files, 
e.getMessage())));
     } finally {
       // If the writing file is not sealed successfully, the writing file will 
be deleted.
       // All pieces of the writing file and its mod(if exists) should be 
retransmitted by the

Reply via email to