This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch priv-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/priv-fix by this push:
new 23941cf8838 fix
23941cf8838 is described below
commit 23941cf8838f7fc595b00bca4473db2d34489f69
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jan 14 11:26:46 2026 +0800
fix
---
.../subtask/processor/PipeProcessorSubtask.java | 21 ++++++++++++++++++++-
.../db/pipe/event/common/PipeInsertionEvent.java | 4 ++++
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 ----
.../dataregion/tsfile/TsFileResource.java | 4 +++-
4 files changed, 27 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index d8fbac9e44e..b481117c5e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
@@ -143,7 +144,25 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
// event can be supplied after the subtask is closed, so we need to
check isClosed here
if (!isClosed.get()) {
if (event instanceof TabletInsertionEvent) {
- pipeProcessor.process((TabletInsertionEvent) event,
outputEventCollector);
+ if (event instanceof PipeInsertNodeTabletInsertionEvent
+ && ((PipeInsertNodeTabletInsertionEvent)
event).shouldParse4Privilege()) {
+ final AtomicReference<Exception> ex = new AtomicReference<>();
+ ((PipeInsertNodeTabletInsertionEvent) event)
+ .toRawTabletInsertionEvents()
+ .forEach(
+ rawTabletInsertionEvent -> {
+ try {
+ pipeProcessor.process(rawTabletInsertionEvent,
outputEventCollector);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ });
+ if (ex.get() != null) {
+ throw ex.get();
+ }
+ } else {
+ pipeProcessor.process((TabletInsertionEvent) event,
outputEventCollector);
+ }
PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
} else if (event instanceof TsFileInsertionEvent) {
// We have to parse the privilege first, to avoid passing
no-privilege data to processor
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
index ce491b92ef4..3e1f4b476ae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/PipeInsertionEvent.java
@@ -170,4 +170,8 @@ public abstract class PipeInsertionEvent extends
EnrichedEvent {
this.tableModelDatabaseName = tableModelDatabaseName.toLowerCase();
this.treeModelDatabaseName =
PathUtils.qualifyDatabaseName(tableModelDatabaseName);
}
+
+ public boolean shouldParse4Privilege() {
+ return shouldParse4Privilege;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index d3d601d5754..6b3e505d2ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -404,10 +404,6 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
}
}
- public boolean shouldParse4Privilege() {
- return shouldParse4Privilege;
- }
-
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 9a827360f70..408be43ade6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -846,7 +846,9 @@ public class TsFileResource implements PersistentResource,
Cloneable {
// To release the memory occupied by pipe if held by it
// Note that pipe can safely handle the case that the time index does not
exist
isEmpty();
- degradeTimeIndex();
+ if (getStatus() != TsFileResourceStatus.UNCLOSED) {
+ degradeTimeIndex();
+ }
try {
fsFactory.deleteIfExists(file);
fsFactory.deleteIfExists(