This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch air-gap-ttl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c92c36c6541fdaacf45705417d467ad4b315e504
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 6 10:50:39 2026 +0800

    idemp
---
 .../visitor/PipeStatementTSStatusVisitor.java      | 221 ++++++++++-----------
 1 file changed, 106 insertions(+), 115 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 7d1b08b1238..49b9b9a87c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
@@ -43,6 +44,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchAct
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import java.util.stream.Collectors;
+
 /**
  * This visitor translated some {@link TSStatus} to pipe related status to 
help sender classify them
  * and apply different error handling tactics. Please DO NOT modify the {@link 
TSStatus} returned by
@@ -53,227 +56,215 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   @Override
-  public TSStatus visitNode(final StatementNode node, final TSStatus context) {
-    return context;
+  public TSStatus process(final StatementNode node, final TSStatus status) {
+    return status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
+        ? PipeReceiverStatusHandler.getPriorStatus(
+            status.getSubStatus().stream()
+                .map(subStatus -> node.accept(this, subStatus))
+                .collect(Collectors.toList()))
+        : node.accept(this, status);
+  }
+
+  @Override
+  public TSStatus visitNode(final StatementNode node, final TSStatus status) {
+    return status;
   }
 
   @Override
   public TSStatus visitLoadFile(
-      final LoadTsFileStatement loadTsFileStatement, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
-        || context.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
-            && context.getMessage() != null
-            && context.getMessage().contains("memory")) {
+      final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
+    if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+        || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
+            && status.getMessage() != null
+            && status.getMessage().contains("memory")) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return super.visitLoadFile(loadTsFileStatement, context);
+    return super.visitLoadFile(loadTsFileStatement, status);
   }
 
   @Override
   public TSStatus visitInsertTablet(
-      final InsertTabletStatement insertTabletStatement, final TSStatus 
context) {
-    return visitInsertBase(insertTabletStatement, context);
+      final InsertTabletStatement insertTabletStatement, final TSStatus 
status) {
+    return visitInsertBase(insertTabletStatement, status);
   }
 
   @Override
   public TSStatus visitInsertRow(
-      final InsertRowStatement insertRowStatement, final TSStatus context) {
-    return visitInsertBase(insertRowStatement, context);
+      final InsertRowStatement insertRowStatement, final TSStatus status) {
+    return visitInsertBase(insertRowStatement, status);
   }
 
   @Override
   public TSStatus visitInsertRows(
-      final InsertRowsStatement insertRowsStatement, final TSStatus context) {
-    return visitInsertBase(insertRowsStatement, context);
+      final InsertRowsStatement insertRowsStatement, final TSStatus status) {
+    return visitInsertBase(insertRowsStatement, status);
   }
 
   @Override
   public TSStatus visitInsertMultiTablets(
-      final InsertMultiTabletsStatement insertMultiTabletsStatement, final 
TSStatus context) {
-    return visitInsertBase(insertMultiTabletsStatement, context);
+      final InsertMultiTabletsStatement insertMultiTabletsStatement, final 
TSStatus status) {
+    return visitInsertBase(insertMultiTabletsStatement, status);
   }
 
   @Override
   public TSStatus visitInsertBase(
-      final InsertBaseStatement insertBaseStatement, final TSStatus context) {
+      final InsertBaseStatement insertBaseStatement, final TSStatus status) {
     // If the system is read-only, we shall not classify it into temporary 
unavailable exception to
     // avoid to many logs
-    if (context.getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+    if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) 
{
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
-      if 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+      if 
(status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
           && config.isEnablePartialInsert()) {
         return new TSStatus(
                 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
+            .setMessage(status.getMessage());
       }
-      if (context.getMessage().contains("does not exist")) {
+      if (status.getMessage().contains("does not exist")) {
         return new TSStatus(
                 
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
+            .setMessage(status.getMessage());
       }
     }
-    return visitStatement(insertBaseStatement, context);
+    return visitStatement(insertBaseStatement, status);
   }
 
   @Override
   public TSStatus visitCreateTimeseries(
-      final CreateTimeSeriesStatement statement, final TSStatus context) {
-    return visitGeneralCreateTimeSeries(statement, context);
+      final CreateTimeSeriesStatement statement, final TSStatus status) {
+    return visitGeneralCreateTimeSeries(statement, status);
   }
 
   @Override
   public TSStatus visitCreateAlignedTimeseries(
-      final CreateAlignedTimeSeriesStatement statement, final TSStatus 
context) {
-    return visitGeneralCreateTimeSeries(statement, context);
+      final CreateAlignedTimeSeriesStatement statement, final TSStatus status) 
{
+    return visitGeneralCreateTimeSeries(statement, status);
   }
 
-  private TSStatus visitGeneralCreateTimeSeries(final Statement statement, 
final TSStatus context) {
-    if (context.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
-        || context.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
+  private TSStatus visitGeneralCreateTimeSeries(final Statement statement, 
final TSStatus status) {
+    if (status.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
+        || status.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
-        || context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
+        || status.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(statement, context);
+    return visitStatement(statement, status);
   }
 
   @Override
   public TSStatus visitCreateMultiTimeSeries(
-      final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, 
final TSStatus context) {
-    return visitGeneralCreateMultiTimeseries(createMultiTimeSeriesStatement, 
context);
+      final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, 
final TSStatus status) {
+    return visitGeneralCreateMultiTimeSeries(createMultiTimeSeriesStatement, 
status);
   }
 
   @Override
   public TSStatus visitInternalCreateTimeseries(
       final InternalCreateTimeSeriesStatement 
internalCreateTimeSeriesStatement,
-      final TSStatus context) {
-    return 
visitGeneralCreateMultiTimeseries(internalCreateTimeSeriesStatement, context);
+      final TSStatus status) {
+    return 
visitGeneralCreateMultiTimeSeries(internalCreateTimeSeriesStatement, status);
   }
 
   @Override
   public TSStatus visitInternalCreateMultiTimeSeries(
       final InternalCreateMultiTimeSeriesStatement 
internalCreateMultiTimeSeriesStatement,
-      final TSStatus context) {
-    return 
visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement, 
context);
+      final TSStatus status) {
+    return 
visitGeneralCreateMultiTimeSeries(internalCreateMultiTimeSeriesStatement, 
status);
   }
 
-  private TSStatus visitGeneralCreateMultiTimeseries(
-      final Statement statement, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (final TSStatus status : context.getSubStatus()) {
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
-            && status.getCode() != 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
-          return visitStatement(statement, context);
-        }
-      }
+  private TSStatus visitGeneralCreateMultiTimeSeries(
+      final Statement statement, final TSStatus status) {
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        || status.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
+        || status.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(statement, context);
+    return visitStatement(statement, status);
   }
 
   @Override
   public TSStatus visitAlterTimeSeries(
-      final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus 
context) {
-    if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
-      if (context.getMessage().contains("already")) {
+      final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus 
status) {
+    if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
+      if (status.getMessage().contains("already")) {
         return new TSStatus(
                 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
-      } else if (context.getMessage().contains("does not")) {
+            .setMessage(status.getMessage());
+      } else if (status.getMessage().contains("does not")) {
         return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
+            .setMessage(status.getMessage());
       }
-    } else if (context.getCode() == 
TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
+    } else if (status.getCode() == 
TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(alterTimeSeriesStatement, context);
+    return visitStatement(alterTimeSeriesStatement, status);
   }
 
   @Override
   public TSStatus visitCreateLogicalView(
-      final CreateLogicalViewStatement createLogicalViewStatement, final 
TSStatus context) {
-    if (context.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (final TSStatus status : context.getSubStatus()) {
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-          return visitStatement(createLogicalViewStatement, context);
-        }
-      }
+      final CreateLogicalViewStatement createLogicalViewStatement, final 
TSStatus status) {
+    if (status.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return super.visitCreateLogicalView(createLogicalViewStatement, context);
+    return super.visitCreateLogicalView(createLogicalViewStatement, status);
   }
 
   @Override
   public TSStatus visitActivateTemplate(
-      final ActivateTemplateStatement activateTemplateStatement, final 
TSStatus context) {
-    return visitGeneralActivateTemplate(activateTemplateStatement, context);
+      final ActivateTemplateStatement activateTemplateStatement, final 
TSStatus status) {
+    return visitGeneralActivateTemplate(activateTemplateStatement, status);
   }
 
   @Override
   public TSStatus visitBatchActivateTemplate(
-      final BatchActivateTemplateStatement batchActivateTemplateStatement, 
final TSStatus context) {
-    boolean userConflict = false;
-    if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (final TSStatus status : context.getSubStatus()) {
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && status.getCode() != 
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
-          return visitStatement(batchActivateTemplateStatement, context);
-        }
-        if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
-            && context.isSetMessage()
-            && context.getMessage().contains("has not been set any template")) 
{
-          userConflict = true;
-        }
-      }
-      return (userConflict
-              ? new TSStatus(
-                  
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-              : new TSStatus(
-                  
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()))
-          .setMessage(context.getMessage());
+      final BatchActivateTemplateStatement batchActivateTemplateStatement, 
final TSStatus status) {
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        || status.getCode() == 
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(status.getMessage());
+    }
+    if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && status.isSetMessage()
+        && status.getMessage().contains("has not been set any template")) {
+      return new TSStatus(
+              
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(status.getMessage());
     }
-    return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
context);
+    return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
status);
   }
 
   private TSStatus visitGeneralActivateTemplate(
-      final Statement activateTemplateStatement, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      final Statement activateTemplateStatement, final TSStatus status) {
+    if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
-        && context.isSetMessage()
-        && context.getMessage().contains("has not been set any template")) {
+    if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && status.isSetMessage()
+        && status.getMessage().contains("has not been set any template")) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(activateTemplateStatement, context);
+    return visitStatement(activateTemplateStatement, status);
   }
 }

Reply via email to