This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch air-gap-ttl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c92c36c6541fdaacf45705417d467ad4b315e504 Author: Caideyipi <[email protected]> AuthorDate: Fri Feb 6 10:50:39 2026 +0800 idemp --- .../visitor/PipeStatementTSStatusVisitor.java | 221 ++++++++++----------- 1 file changed, 106 insertions(+), 115 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index 7d1b08b1238..49b9b9a87c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.receiver.visitor; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; @@ -43,6 +44,8 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchAct import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement; import org.apache.iotdb.rpc.TSStatusCode; +import java.util.stream.Collectors; + /** * This visitor translated some {@link TSStatus} to pipe related status to help sender classify them * and apply different error handling tactics. Please DO NOT modify the {@link TSStatus} returned by @@ -53,227 +56,215 @@ public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus, TSS private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @Override - public TSStatus visitNode(final StatementNode node, final TSStatus context) { - return context; + public TSStatus process(final StatementNode node, final TSStatus status) { + return status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode() + ? PipeReceiverStatusHandler.getPriorStatus( + status.getSubStatus().stream() + .map(subStatus -> node.accept(this, subStatus)) + .collect(Collectors.toList())) + : node.accept(this, status); + } + + @Override + public TSStatus visitNode(final StatementNode node, final TSStatus status) { + return status; } @Override public TSStatus visitLoadFile( - final LoadTsFileStatement loadTsFileStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() - || context.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode() - && context.getMessage() != null - && context.getMessage().contains("memory")) { + final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() + || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode() + && status.getMessage() != null + && status.getMessage().contains("memory")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return super.visitLoadFile(loadTsFileStatement, context); + return super.visitLoadFile(loadTsFileStatement, status); } @Override public TSStatus visitInsertTablet( - final InsertTabletStatement insertTabletStatement, final TSStatus context) { - return visitInsertBase(insertTabletStatement, context); + final InsertTabletStatement insertTabletStatement, final TSStatus status) { + return visitInsertBase(insertTabletStatement, status); } @Override public TSStatus visitInsertRow( - final InsertRowStatement insertRowStatement, final TSStatus context) { - return visitInsertBase(insertRowStatement, context); + final InsertRowStatement insertRowStatement, final TSStatus status) { + return visitInsertBase(insertRowStatement, status); } @Override public TSStatus visitInsertRows( - final InsertRowsStatement insertRowsStatement, final TSStatus context) { - return visitInsertBase(insertRowsStatement, context); + final InsertRowsStatement insertRowsStatement, final TSStatus status) { + return visitInsertBase(insertRowsStatement, status); } @Override public TSStatus visitInsertMultiTablets( - final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus context) { - return visitInsertBase(insertMultiTabletsStatement, context); + final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus status) { + return visitInsertBase(insertMultiTabletsStatement, status); } @Override public TSStatus visitInsertBase( - final InsertBaseStatement insertBaseStatement, final TSStatus context) { + final InsertBaseStatement insertBaseStatement, final TSStatus status) { // If the system is read-only, we shall not classify it into temporary unavailable exception to // avoid to many logs - if (context.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { + if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { - if (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + if (status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) && config.isEnablePartialInsert()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - if (context.getMessage().contains("does not exist")) { + if (status.getMessage().contains("does not exist")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } } - return visitStatement(insertBaseStatement, context); + return visitStatement(insertBaseStatement, status); } @Override public TSStatus visitCreateTimeseries( - final CreateTimeSeriesStatement statement, final TSStatus context) { - return visitGeneralCreateTimeSeries(statement, context); + final CreateTimeSeriesStatement statement, final TSStatus status) { + return visitGeneralCreateTimeSeries(statement, status); } @Override public TSStatus visitCreateAlignedTimeseries( - final CreateAlignedTimeSeriesStatement statement, final TSStatus context) { - return visitGeneralCreateTimeSeries(statement, context); + final CreateAlignedTimeSeriesStatement statement, final TSStatus status) { + return visitGeneralCreateTimeSeries(statement, status); } - private TSStatus visitGeneralCreateTimeSeries(final Statement statement, final TSStatus context) { - if (context.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() - || context.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { + private TSStatus visitGeneralCreateTimeSeries(final Statement statement, final TSStatus status) { + if (status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode() - || context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(statement, context); + return visitStatement(statement, status); } @Override public TSStatus visitCreateMultiTimeSeries( - final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, final TSStatus context) { - return visitGeneralCreateMultiTimeseries(createMultiTimeSeriesStatement, context); + final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, final TSStatus status) { + return visitGeneralCreateMultiTimeSeries(createMultiTimeSeriesStatement, status); } @Override public TSStatus visitInternalCreateTimeseries( final InternalCreateTimeSeriesStatement internalCreateTimeSeriesStatement, - final TSStatus context) { - return visitGeneralCreateMultiTimeseries(internalCreateTimeSeriesStatement, context); + final TSStatus status) { + return visitGeneralCreateMultiTimeSeries(internalCreateTimeSeriesStatement, status); } @Override public TSStatus visitInternalCreateMultiTimeSeries( final InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement, - final TSStatus context) { - return visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement, context); + final TSStatus status) { + return visitGeneralCreateMultiTimeSeries(internalCreateMultiTimeSeriesStatement, status); } - private TSStatus visitGeneralCreateMultiTimeseries( - final Statement statement, final TSStatus context) { - if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - for (final TSStatus status : context.getSubStatus()) { - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() - && status.getCode() != TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { - return visitStatement(statement, context); - } - } + private TSStatus visitGeneralCreateMultiTimeSeries( + final Statement statement, final TSStatus status) { + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(statement, context); + return visitStatement(statement, status); } @Override public TSStatus visitAlterTimeSeries( - final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { - if (context.getMessage().contains("already")) { + final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + if (status.getMessage().contains("already")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getMessage().contains("does not")) { + .setMessage(status.getMessage()); + } else if (status.getMessage().contains("does not")) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - } else if (context.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode()) { + } else if (status.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(alterTimeSeriesStatement, context); + return visitStatement(alterTimeSeriesStatement, status); } @Override public TSStatus visitCreateLogicalView( - final CreateLogicalViewStatement createLogicalViewStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - for (final TSStatus status : context.getSubStatus()) { - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - return visitStatement(createLogicalViewStatement, context); - } - } + final CreateLogicalViewStatement createLogicalViewStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return super.visitCreateLogicalView(createLogicalViewStatement, context); + return super.visitCreateLogicalView(createLogicalViewStatement, status); } @Override public TSStatus visitActivateTemplate( - final ActivateTemplateStatement activateTemplateStatement, final TSStatus context) { - return visitGeneralActivateTemplate(activateTemplateStatement, context); + final ActivateTemplateStatement activateTemplateStatement, final TSStatus status) { + return visitGeneralActivateTemplate(activateTemplateStatement, status); } @Override public TSStatus visitBatchActivateTemplate( - final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus context) { - boolean userConflict = false; - if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - for (final TSStatus status : context.getSubStatus()) { - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { - return visitStatement(batchActivateTemplateStatement, context); - } - if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() - && context.isSetMessage() - && context.getMessage().contains("has not been set any template")) { - userConflict = true; - } - } - return (userConflict - ? new TSStatus( - TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - : new TSStatus( - TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) - .setMessage(context.getMessage()); + final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { + return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && status.isSetMessage() + && status.getMessage().contains("has not been set any template")) { + return new TSStatus( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); } - return visitGeneralActivateTemplate(batchActivateTemplateStatement, context); + return visitGeneralActivateTemplate(batchActivateTemplateStatement, status); } private TSStatus visitGeneralActivateTemplate( - final Statement activateTemplateStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { + final Statement activateTemplateStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() - && context.isSetMessage() - && context.getMessage().contains("has not been set any template")) { + if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && status.isSetMessage() + && status.getMessage().contains("has not been set any template")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(activateTemplateStatement, context); + return visitStatement(activateTemplateStatement, status); } }
