This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 9d0d750da74 Fix pipe permission retry and table parser progress
(#17844) (#17848)
9d0d750da74 is described below
commit 9d0d750da7442cc146ee1541c014201fc78ef475
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 5 14:51:19 2026 +0800
Fix pipe permission retry and table parser progress (#17844) (#17848)
(cherry picked from commit eaa5bcb012e48525424dcba0140b293747e7f564)
---
.../db/pipe/sink/protocol/writeback/WriteBackSink.java | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
index 21976f63ab6..269d0059c66 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.writeback;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -131,7 +132,8 @@ public class WriteBackSink implements PipeConnector {
status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS :
executeStatement(statement);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(
+ throwWriteBackExceptionIfNecessary(
+ status,
String.format(
"Transfer PipeInsertNodeTabletInsertionEvent %s error, result
status %s",
pipeInsertNodeTabletInsertionEvent, status));
@@ -162,13 +164,23 @@ public class WriteBackSink implements PipeConnector {
statement.isEmpty() ? RpcUtils.SUCCESS_STATUS :
executeStatement(statement);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new PipeException(
+ throwWriteBackExceptionIfNecessary(
+ status,
String.format(
"Transfer PipeRawTabletInsertionEvent %s error, result status
%s",
pipeRawTabletInsertionEvent, status));
}
}
+ private static void throwWriteBackExceptionIfNecessary(
+ final TSStatus status, final String exceptionMessage) {
+ if (status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()) {
+ throw new
PipeRuntimeSinkNonReportTimeConfigurableException(exceptionMessage,
Long.MAX_VALUE);
+ }
+
+ throw new PipeException(exceptionMessage);
+ }
+
private TSStatus executeStatement(final InsertBaseStatement statement) {
return Coordinator.getInstance()
.executeForTreeModel(