This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7cef068e674 Pipe: Fixed the NPE for last table node in snapshot parser
& Added judging logic when table is also device in snapshot parser & Refactor &
Fixed the missing trimming logic for tsFile & Enhanced the table idempotency
logic & Fixed the ttl + table database privilege logic at receiver side & Fixed
the bug that the table write auto-create-db does not check privilege (#15135)
7cef068e674 is described below
commit 7cef068e674b01a8162093afc9b1213b30af0483
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 21 18:58:25 2025 +0800
Pipe: Fixed the NPE for last table node in snapshot parser & Added judging
logic when table is also device in snapshot parser & Refactor & Fixed the
missing trimming logic for tsFile & Enhanced the table idempotency logic &
Fixed the ttl + table database privilege logic at receiver side & Fixed the bug
that the table write auto-create-db does not check privilege (#15135)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 61 ++++++++++++++++++----
.../PipeConfigPhysicalPlanTSStatusVisitor.java | 20 +++----
.../agent/task/connection/PipeEventCollector.java | 15 ++++--
.../common/tsfile/PipeTsFileInsertionEvent.java | 5 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 1 +
.../db/tools/schema/SRStatementGenerator.java | 8 ++-
.../commons/pipe/receiver/IoTDBFileReceiver.java | 2 +-
7 files changed, 77 insertions(+), 35 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index b1a3dfac009..8a86d679b14 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.auth.entity.PrivilegeUnion;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
@@ -264,11 +265,37 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
private TSStatus checkPermission(final ConfigPhysicalPlan plan) {
switch (plan.getType()) {
case CreateDatabase:
+ return PathUtils.isTableModelDatabase(((DatabaseSchemaPlan)
plan).getSchema().getName())
+ ? configManager
+ .checkUserPrivileges(
+ username,
+ new PrivilegeUnion(
+ ((DatabaseSchemaPlan) plan).getSchema().getName(),
PrivilegeType.CREATE))
+ .getStatus()
+ : configManager
+ .checkUserPrivileges(username, new
PrivilegeUnion(PrivilegeType.MANAGE_DATABASE))
+ .getStatus();
case AlterDatabase:
+ return PathUtils.isTableModelDatabase(((DatabaseSchemaPlan)
plan).getSchema().getName())
+ ? configManager
+ .checkUserPrivileges(
+ username,
+ new PrivilegeUnion(
+ ((DatabaseSchemaPlan) plan).getSchema().getName(),
PrivilegeType.ALTER))
+ .getStatus()
+ : configManager
+ .checkUserPrivileges(username, new
PrivilegeUnion(PrivilegeType.MANAGE_DATABASE))
+ .getStatus();
case DeleteDatabase:
- return configManager
- .checkUserPrivileges(username, new
PrivilegeUnion(PrivilegeType.MANAGE_DATABASE))
- .getStatus();
+ return PathUtils.isTableModelDatabase(((DeleteDatabasePlan)
plan).getName())
+ ? configManager
+ .checkUserPrivileges(
+ username,
+ new PrivilegeUnion(((DeleteDatabasePlan) plan).getName(),
PrivilegeType.DROP))
+ .getStatus()
+ : configManager
+ .checkUserPrivileges(username, new
PrivilegeUnion(PrivilegeType.MANAGE_DATABASE))
+ .getStatus();
case ExtendSchemaTemplate:
return configManager
.checkUserPrivileges(username, new
PrivilegeUnion(PrivilegeType.EXTEND_TEMPLATE))
@@ -312,14 +339,26 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
PrivilegeType.WRITE_SCHEMA))
.getStatus();
case SetTTL:
- return configManager
- .checkUserPrivileges(
- username,
- new PrivilegeUnion(
- Collections.singletonList(
- new PartialPath(((SetTTLPlan) plan).getPathPattern())),
- PrivilegeType.WRITE_SCHEMA))
- .getStatus();
+ return Objects.equals(
+ configManager
+ .getTTLManager()
+ .getAllTTL()
+ .get(
+ String.join(
+ String.valueOf(IoTDBConstant.PATH_SEPARATOR),
+ ((SetTTLPlan) plan).getPathPattern())),
+ ((SetTTLPlan) plan).getTTL())
+ ? StatusUtils.OK
+ : configManager
+ .checkUserPrivileges(
+ username,
+ ((SetTTLPlan) plan).isDataBase()
+ ? new PrivilegeUnion(PrivilegeType.MANAGE_DATABASE)
+ : new PrivilegeUnion(
+ Collections.singletonList(
+ new PartialPath(((SetTTLPlan)
plan).getPathPattern())),
+ PrivilegeType.WRITE_SCHEMA))
+ .getStatus();
case UpdateTriggerStateInTable:
case DeleteTriggerInTable:
return configManager
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
index faa8789ba67..adffa6b6778 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -535,13 +535,13 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
@Override
public TSStatus visitCommitDeleteColumn(
final CommitDeleteColumnPlan commitDeleteColumnPlan, final TSStatus
context) {
- return visitCommonTableColumnPlan(commitDeleteColumnPlan, context);
+ return visitCommonTablePlan(commitDeleteColumnPlan, context);
}
@Override
public TSStatus visitRenameTableColumn(
final RenameTableColumnPlan renameTableColumnPlan, final TSStatus
context) {
- return visitCommonTableColumnPlan(renameTableColumnPlan, context);
+ return visitCommonTablePlan(renameTableColumnPlan, context);
}
@Override
@@ -565,22 +565,14 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
@Override
public TSStatus visitSetTableColumnComment(
final SetTableColumnCommentPlan setTableColumnCommentPlan, final
TSStatus context) {
- return visitCommonTableColumnPlan(setTableColumnCommentPlan, context);
- }
-
- private TSStatus visitCommonTableColumnPlan(
- final ConfigPhysicalPlan plan, final TSStatus context) {
- if (context.getCode() == TSStatusCode.COLUMN_ALREADY_EXISTS.getStatusCode()
- || context.getCode() ==
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode()) {
- return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- }
- return visitCommonTablePlan(plan, context);
+ return visitCommonTablePlan(setTableColumnCommentPlan, context);
}
private TSStatus visitCommonTablePlan(final ConfigPhysicalPlan plan, final
TSStatus context) {
if (context.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()
- || context.getCode() == TSStatusCode.TABLE_NOT_EXISTS.getStatusCode())
{
+ || context.getCode() == TSStatusCode.TABLE_NOT_EXISTS.getStatusCode()
+ || context.getCode() ==
TSStatusCode.COLUMN_ALREADY_EXISTS.getStatusCode()
+ || context.getCode() ==
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 51221001a58..6e265711ad0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -136,11 +136,8 @@ public class PipeEventCollector implements EventCollector {
}
if (!forceTabletFormat
- && (!sourceEvent.shouldParseTimeOrPattern()
- || (sourceEvent.isTableModelEvent()
- && (sourceEvent.getTablePattern() == null
- || !sourceEvent.getTablePattern().hasTablePattern())
- && !sourceEvent.shouldParseTime()))) {
+ && !sourceEvent.shouldParse4Privilege()
+ && canSkipParsing4TsFileEvent(sourceEvent)) {
collectEvent(sourceEvent);
return;
}
@@ -154,6 +151,14 @@ public class PipeEventCollector implements EventCollector {
}
}
+ private boolean canSkipParsing4TsFileEvent(final PipeTsFileInsertionEvent
sourceEvent) {
+ return !sourceEvent.shouldParseTimeOrPattern()
+ || (sourceEvent.isTableModelEvent()
+ && (sourceEvent.getTablePattern() == null
+ || !sourceEvent.getTablePattern().hasTablePattern())
+ && !sourceEvent.shouldParseTime());
+ }
+
private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent
parsedEvent) {
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
hasNoGeneratedEvent = false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 19a4913429f..357e11e2f39 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -390,9 +390,8 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
}
}
- @Override
- public boolean shouldParsePattern() {
- return super.shouldParsePattern() || shouldParse4Privilege;
+ public boolean shouldParse4Privilege() {
+ return shouldParse4Privilege;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 66e95adeebb..5f588e0d207 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -987,6 +987,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return;
}
+
Coordinator.getInstance().getAccessControl().checkCanCreateDatabase(username,
database);
final TDatabaseSchema schema = new TDatabaseSchema(new
TDatabaseSchema(database));
schema.setIsTableModel(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
index d2dd6512ca7..d3e24d6eb6d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SRStatementGenerator.java
@@ -183,7 +183,9 @@ public class SRStatementGenerator implements
Iterator<Object>, Iterable<Object>
genAlignedTimeseriesStatement(
// skip common database
node, databaseFullPath.concatPath(node.getPartialPath(), 1));
- statements.push(stmt);
+ if (Objects.nonNull(stmt)) {
+ statements.push(stmt);
+ }
}
cleanMTreeNode(node);
if (ancestors.isEmpty()) {
@@ -308,6 +310,10 @@ public class SRStatementGenerator implements
Iterator<Object>, Iterable<Object>
case TABLE_MNODE_TYPE:
childrenNum = ReadWriteIOUtils.readInt(inputStream);
node = deserializer.deserializeTableDeviceMNode(inputStream);
+ if (ancestors.size() == 1) {
+ emitDevice();
+ this.tableName = node.getName();
+ }
break;
default:
throw new IOException("Unrecognized MNode type" + type);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 1e0f6305f2d..069f4a6a15d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -648,7 +648,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
- String.format("Failed to seal file %s because %s", writingFile,
e.getMessage())));
+ String.format("Failed to seal file %s because %s", files,
e.getMessage())));
} finally {
// If the writing file is not sealed successfully, the writing file will
be deleted.
// All pieces of the writing file and its mod(if exists) should be
retransmitted by the