This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 804ee600cd4 Pipe: Improved handling logic for transferred statements &
Allow root.__system.** schemas to reach the receiver side (#12276)
804ee600cd4 is described below
commit 804ee600cd4f708114d2ddea1dae0c1b66737f6b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Apr 2 17:58:41 2024 +0800
Pipe: Improved handling logic for transferred statements & Allow
root.__system.** schemas to reach the receiver side (#12276)
---
.../extractor/ConfigRegionListeningFilter.java | 11 -----------
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 8 --------
.../PipeConfigPhysicalPlanTSStatusVisitor.java | 3 +++
.../visitor/PipeStatementExceptionVisitor.java | 6 +++++-
.../visitor/PipeStatementTSStatusVisitor.java | 23 ++++++++++++----------
5 files changed, 21 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
index 84cebf259fb..33b62aaa754 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
@@ -21,10 +21,8 @@ package org.apache.iotdb.confignode.manager.pipe.extractor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
-import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import
org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -146,15 +144,6 @@ public class ConfigRegionListeningFilter {
return false;
}
- // Do not transfer system database plan
- if (type.equals(ConfigPhysicalPlanType.CreateDatabase)
- && ((DatabaseSchemaPlan) plan)
- .getSchema()
- .getName()
- .equals(SchemaConstant.SYSTEM_DATABASE)) {
- return false;
- }
-
// PipeEnriched & UnsetTemplate are not listened directly,
// but their inner plan or converted plan are listened.
return type.equals(ConfigPhysicalPlanType.PipeEnriched)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 864c04789ac..28e08e231a8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -27,7 +27,6 @@ import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeReques
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
-import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
@@ -181,13 +180,6 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
private TSStatus executePlan(ConfigPhysicalPlan plan) throws
ConsensusException {
switch (plan.getType()) {
case CreateDatabase:
- if (((DatabaseSchemaPlan) plan)
- .getSchema()
- .getName()
- .equals(SchemaConstant.SYSTEM_DATABASE)) {
- // System database doesn't need transferring
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- }
// Here we only reserve database name and substitute the sender's
local information
// with the receiver's default configurations
TDatabaseSchema schema = ((DatabaseSchemaPlan) plan).getSchema();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
index 7046f3d8dc6..5a30b25e4d7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -65,6 +65,9 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
// Lower or higher level database has been created
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
+ } else if (context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
} else if (context.getCode() ==
TSStatusCode.METADATA_ERROR.getStatusCode()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 1ae7d106dc5..7391fa32fcc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -110,9 +111,12 @@ public class PipeStatementExceptionVisitor extends
StatementVisitor<TSStatus, Ex
return visitGeneralActivateTemplate(batchActivateTemplateStatement,
context);
}
+ // InternalBatchActivateTemplateNode is converted to
BatchActivateTemplateStatement
+ // No need to handle InternalBatchActivateTemplateStatement
+
private TSStatus visitGeneralActivateTemplate(
Statement activateTemplateStatement, Exception context) {
- if (context instanceof MetadataException) {
+ if (context instanceof MetadataException || context instanceof
StatementAnalyzeException) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 52a0c963278..117eae0a689 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -81,10 +81,6 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
@Override
public TSStatus visitCreateTimeseries(CreateTimeSeriesStatement statement,
TSStatus context) {
- if (context.getCode() ==
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
- return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- }
return visitGeneralCreateTimeSeries(statement, context);
}
@@ -99,7 +95,9 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
|| context.getCode() ==
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
- } else if (context.getCode() ==
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()) {
+ } else if (context.getCode() ==
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
+ || context.getCode() ==
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()
+ || context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
@@ -125,8 +123,7 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
return
visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement,
context);
}
- private TSStatus visitGeneralCreateMultiTimeseries(
- Statement internalCreateTimeSeriesStatement, TSStatus context) {
+ private TSStatus visitGeneralCreateMultiTimeseries(Statement statement,
TSStatus context) {
if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
@@ -135,13 +132,16 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
- return visitStatement(internalCreateTimeSeriesStatement, context);
+ return visitStatement(statement, context);
}
}
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
+ } else if (context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
}
- return visitStatement(internalCreateTimeSeriesStatement, context);
+ return visitStatement(statement, context);
}
@Override
@@ -166,7 +166,10 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
@Override
public TSStatus visitCreateLogicalView(
CreateLogicalViewStatement createLogicalViewStatement, TSStatus context)
{
- if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ if (context.getCode() ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ } else if (context.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() !=
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {