This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cdba4936b48 [feature](nereids) Support group commit insert (#26075)
cdba4936b48 is described below
commit cdba4936b48b4951d670649a68fef363c9cfab25
Author: 赵硕 <[email protected]>
AuthorDate: Fri Nov 10 14:20:14 2023 +0800
[feature](nereids) Support group commit insert (#26075)
---
.../apache/doris/analysis/NativeInsertStmt.java | 57 +--
.../doris/nereids/jobs/executor/Rewriter.java | 4 -
.../org/apache/doris/nereids/rules/RuleType.java | 2 -
.../rules/analysis/RejectGroupCommitInsert.java | 53 ---
.../plans/commands/InsertIntoTableCommand.java | 78 +++-
.../apache/doris/planner/GroupCommitPlanner.java | 196 ++++++++++
.../java/org/apache/doris/qe/StmtExecutor.java | 41 +--
.../data/insert_p0/insert_group_commit_into.out | 95 +++++
.../insert_p0/insert_group_commit_into.groovy | 348 +++++++++---------
.../insert_group_commit_with_exception.groovy | 402 +++++++++++----------
.../insert_group_commit_with_large_data.groovy | 89 ++---
11 files changed, 836 insertions(+), 529 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index d4802ea956c..27e4c6bf36c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -50,24 +50,14 @@ import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.ExportSink;
import org.apache.doris.planner.GroupCommitBlockSink;
import org.apache.doris.planner.GroupCommitOlapTableSink;
+import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapTableSink;
-import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.planner.external.jdbc.JdbcTableSink;
-import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.tablefunction.GroupCommitTableValuedFunction;
-import org.apache.doris.task.StreamLoadTask;
-import org.apache.doris.thrift.TExecPlanFragmentParams;
-import org.apache.doris.thrift.TExecPlanFragmentParamsList;
-import org.apache.doris.thrift.TFileCompressType;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TQueryOptions;
-import org.apache.doris.thrift.TScanRangeParams;
-import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
@@ -84,10 +74,8 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -167,6 +155,7 @@ public class NativeInsertStmt extends InsertStmt {
// true if be generates an insert from group commit tvf stmt and executes
to load data
public boolean isGroupCommitTvf = false;
public boolean isGroupCommitStreamLoadSql = false;
+ private GroupCommitPlanner groupCommitPlanner;
private boolean isFromDeleteOrUpdateStmt = false;
@@ -1134,10 +1123,10 @@ public class NativeInsertStmt extends InsertStmt {
return isGroupCommit;
}
- public void planForGroupCommit(TUniqueId queryId) throws UserException,
TException {
+ public GroupCommitPlanner planForGroupCommit(TUniqueId queryId) throws
UserException, TException {
OlapTable olapTable = (OlapTable) getTargetTable();
if (execPlanFragmentParamsBytes != null &&
olapTable.getBaseSchemaVersion() == baseSchemaVersion) {
- return;
+ return groupCommitPlanner;
}
if (!targetColumns.isEmpty()) {
Analyzer analyzerTmp = analyzer;
@@ -1145,45 +1134,11 @@ public class NativeInsertStmt extends InsertStmt {
this.analyzer = analyzerTmp;
}
analyzeSubquery(analyzer, true);
- TStreamLoadPutRequest streamLoadPutRequest = new
TStreamLoadPutRequest();
- if (targetColumnNames != null) {
- streamLoadPutRequest.setColumns(String.join(",",
targetColumnNames));
- if (targetColumnNames.stream().anyMatch(col ->
col.equalsIgnoreCase(Column.SEQUENCE_COL))) {
- streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL);
- }
- }
- streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1)
- .setTbl(getTbl())
-
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
- .setGroupCommit(true);
- StreamLoadTask streamLoadTask =
StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
- StreamLoadPlanner planner = new StreamLoadPlanner((Database)
getDbObj(), olapTable, streamLoadTask);
- // Will using load id as query id in fragment
- TExecPlanFragmentParams tRequest =
planner.plan(streamLoadTask.getId());
- for (Map.Entry<Integer, List<TScanRangeParams>> entry :
tRequest.params.per_node_scan_ranges.entrySet()) {
- for (TScanRangeParams scanRangeParams : entry.getValue()) {
-
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
- TFileFormatType.FORMAT_PROTO);
-
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
- TFileCompressType.PLAIN);
- }
- }
- List<TScanRangeParams> scanRangeParams =
tRequest.params.per_node_scan_ranges.values().stream()
- .flatMap(Collection::stream).collect(Collectors.toList());
- Preconditions.checkState(scanRangeParams.size() == 1);
+ groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable,
targetColumnNames, queryId);
// save plan message to be reused for prepare stmt
loadId = queryId;
baseSchemaVersion = olapTable.getBaseSchemaVersion();
- // see BackendServiceProxy#execPlanFragmentsAsync
- TExecPlanFragmentParamsList paramsList = new
TExecPlanFragmentParamsList();
- paramsList.addToParamsList(tRequest);
- execPlanFragmentParamsBytes = ByteString.copyFrom(new
TSerializer().serialize(paramsList));
- }
-
- public InternalService.PExecPlanFragmentRequest
getExecPlanFragmentRequest() {
- return
InternalService.PExecPlanFragmentRequest.newBuilder().setRequest(execPlanFragmentParamsBytes)
-
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build();
+ return groupCommitPlanner;
}
public TUniqueId getLoadId() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index fe6bc13b6c4..802ddaa6c0e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -28,7 +28,6 @@ import
org.apache.doris.nereids.rules.analysis.CheckAfterRewrite;
import org.apache.doris.nereids.rules.analysis.EliminateGroupByConstant;
import
org.apache.doris.nereids.rules.analysis.LogicalSubQueryAliasToLogicalProject;
import org.apache.doris.nereids.rules.analysis.NormalizeAggregate;
-import org.apache.doris.nereids.rules.analysis.RejectGroupCommitInsert;
import org.apache.doris.nereids.rules.expression.CheckLegalityAfterRewrite;
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
@@ -269,9 +268,6 @@ public class Rewriter extends AbstractBatchJobExecutor {
topDown(new BuildAggForUnion())
),
- // TODO remove it after Nereids support group commit insert
- topDown(new RejectGroupCommitInsert()),
-
// topic("Distinct",
//
costBased(custom(RuleType.PUSH_DOWN_DISTINCT_THROUGH_JOIN,
PushdownDistinctThroughJoin::new))
// ),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 505c7db91a7..94ccef528fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -77,8 +77,6 @@ public enum RuleType {
ANALYZE_CTE(RuleTypeClass.REWRITE),
RELATION_AUTHENTICATION(RuleTypeClass.VALIDATION),
- REJECT_GROUP_COMMIT_INSERT(RuleTypeClass.REWRITE),
-
ADJUST_NULLABLE_FOR_PROJECT_SLOT(RuleTypeClass.REWRITE),
ADJUST_NULLABLE_FOR_AGGREGATE_SLOT(RuleTypeClass.REWRITE),
ADJUST_NULLABLE_FOR_HAVING_SLOT(RuleTypeClass.REWRITE),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/RejectGroupCommitInsert.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/RejectGroupCommitInsert.java
deleted file mode 100644
index 19c9706e03d..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/RejectGroupCommitInsert.java
+++ /dev/null
@@ -1,53 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.rules.analysis;
-
-import org.apache.doris.nereids.exceptions.AnalysisException;
-import org.apache.doris.nereids.rules.Rule;
-import org.apache.doris.nereids.rules.RuleType;
-import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.List;
-
-/**
- * reject group commit insert added by PR <a
href="https://github.com/apache/doris/pull/22829/files">#22829</a>
- */
-public class RejectGroupCommitInsert implements RewriteRuleFactory {
-
- @Override
- public List<Rule> buildRules() {
- return ImmutableList.of(
- logicalOlapTableSink(logicalOneRowRelation())
- .thenApply(ctx -> {
- if
(ctx.connectContext.getSessionVariable().enableInsertGroupCommit) {
- throw new AnalysisException("Nereids do not
support group commit now.");
- }
- return null;
- }).toRule(RuleType.REJECT_GROUP_COMMIT_INSERT),
- logicalOlapTableSink(logicalUnion().when(u -> u.arity() == 0))
- .thenApply(ctx -> {
- if
(ctx.connectContext.getSessionVariable().enableInsertGroupCommit) {
- throw new AnalysisException("Nereids do not
support group commit now.");
- }
- return null;
- }).toRule(RuleType.REJECT_GROUP_COMMIT_INSERT)
- );
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index f7c30f3820b..79dc9485fc8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -22,9 +22,11 @@ import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DropPartitionClause;
+import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.DdlException;
@@ -39,6 +41,7 @@ import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.TreeNode;
+import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
@@ -46,12 +49,19 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.txn.Transaction;
+import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapTableSink;
+import org.apache.doris.planner.UnionNode;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -59,6 +69,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -68,6 +79,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
/**
* insert into select command implementation
@@ -152,7 +165,13 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
}
OlapTableSink sink = ((OlapTableSink)
planner.getFragments().get(0).getSink());
-
+ if (ctx.getSessionVariable().enableInsertGroupCommit) {
+ // group commit
+ if (analyzeGroupCommit(sink, physicalOlapTableSink)) {
+ handleGroupCommit(ctx, sink, physicalOlapTableSink);
+ return;
+ }
+ }
Preconditions.checkArgument(!isTxnBegin, "an insert command cannot
create more than one txn");
Transaction txn = new Transaction(ctx,
physicalOlapTableSink.getDatabase(),
@@ -352,6 +371,63 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
}
}
+ private void handleGroupCommit(ConnectContext ctx, OlapTableSink sink,
+ PhysicalOlapTableSink<?> physicalOlapTableSink)
+ throws UserException, RpcException, TException,
ExecutionException, InterruptedException {
+
+ List<InternalService.PDataRow> rows = new ArrayList<>();
+ List<List<Expr>> materializedConstExprLists = ((UnionNode)
sink.getFragment().getPlanRoot())
+ .getMaterializedConstExprLists();
+
+ int filterSize = 0;
+ for (Slot slot : physicalOlapTableSink.getOutput()) {
+ if (slot.getName().contains(Column.DELETE_SIGN)
+ || slot.getName().contains(Column.VERSION_COL)) {
+ filterSize += 1;
+ }
+ }
+ for (List<Expr> list : materializedConstExprLists) {
+ rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
+ }
+ GroupCommitPlanner groupCommitPlanner = new
GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
+ physicalOlapTableSink.getTargetTable(), null, ctx.queryId());
+ Future<PGroupCommitInsertResponse> future =
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
+ PGroupCommitInsertResponse response = future.get();
+ TStatusCode code =
TStatusCode.findByValue(response.getStatus().getStatusCode());
+ if (code == TStatusCode.DATA_QUALITY_ERROR) {
+ LOG.info("group commit insert failed. query id: {}, backend id:
{}, status: {}, "
+ + "schema version: {}", ctx.queryId(),
+ groupCommitPlanner.getBackend(), response.getStatus(),
+
physicalOlapTableSink.getTargetTable().getBaseSchemaVersion());
+ } else if (code != TStatusCode.OK) {
+ String errMsg = "group commit insert failed. backend id: "
+ + groupCommitPlanner.getBackend().getId() + ", status: "
+ + response.getStatus();
+ ErrorReport.reportDdlException(errMsg,
ErrorCode.ERR_FAILED_WHEN_INSERT);
+ }
+ TransactionStatus txnStatus = TransactionStatus.PREPARE;
+ StringBuilder sb = new StringBuilder();
+ sb.append("{'label':'").append(response.getLabel()).append("',
'status':'").append(txnStatus.name());
+ sb.append("', 'txnId':'").append(response.getTxnId()).append("'");
+ sb.append("', 'optimizer':'").append("nereids").append("'");
+ sb.append("}");
+
+ ctx.getState().setOk(response.getLoadedRows(), (int)
response.getFilteredRows(), sb.toString());
+ ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
+ physicalOlapTableSink.getDatabase().getFullName(),
physicalOlapTableSink.getTargetTable().getName(),
+ txnStatus, response.getLoadedRows(), (int)
response.getFilteredRows());
+ // update it, so that user can get loaded rows in fe.audit.log
+ ctx.updateReturnRows((int) response.getLoadedRows());
+ }
+
+ private boolean analyzeGroupCommit(OlapTableSink sink,
PhysicalOlapTableSink<?> physicalOlapTableSink) {
+ return
ConnectContext.get().getSessionVariable().enableInsertGroupCommit
+ && physicalOlapTableSink.getTargetTable() instanceof OlapTable
+ && !ConnectContext.get().isTxnModel()
+ && sink.getFragment().getPlanRoot() instanceof UnionNode
+ && physicalOlapTableSink.getPartitionIds().isEmpty();
+ }
+
@Override
public Plan getExplainPlan(ConnectContext ctx) {
return this.logicalQuery;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
new file mode 100644
index 00000000000..15aa639abee
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
+import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
+import org.apache.doris.proto.Types;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.task.StreamLoadTask;
+import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
+import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMergeType;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TScanRangeParams;
+import org.apache.doris.thrift.TStreamLoadPutRequest;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+// Used to generate a plan fragment for a group commit
+// we only support OlapTable now.
+public class GroupCommitPlanner {
+ private static final Logger LOG =
LogManager.getLogger(GroupCommitPlanner.class);
+
+ private Database db;
+ private OlapTable table;
+ private TUniqueId loadId;
+ private Backend backend;
+ private TExecPlanFragmentParamsList paramsList;
+ private ByteString execPlanFragmentParamsBytes;
+
+ public GroupCommitPlanner(Database db, OlapTable table, List<String>
targetColumnNames, TUniqueId queryId)
+ throws UserException, TException {
+ this.db = db;
+ this.table = table;
+ TStreamLoadPutRequest streamLoadPutRequest = new
TStreamLoadPutRequest();
+ if (targetColumnNames != null) {
+ streamLoadPutRequest.setColumns(String.join(",",
targetColumnNames));
+ if (targetColumnNames.stream().anyMatch(col ->
col.equalsIgnoreCase(Column.SEQUENCE_COL))) {
+ streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL);
+ }
+ }
+ streamLoadPutRequest
+ .setDb(db.getFullName())
+ .setMaxFilterRatio(1)
+ .setTbl(table.getName())
+
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
+
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
+ .setGroupCommit(true);
+ StreamLoadTask streamLoadTask =
StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
+ StreamLoadPlanner planner = new StreamLoadPlanner(db, table,
streamLoadTask);
+ // Will using load id as query id in fragment
+ TExecPlanFragmentParams tRequest =
planner.plan(streamLoadTask.getId());
+ for (Map.Entry<Integer, List<TScanRangeParams>> entry :
tRequest.params.per_node_scan_ranges.entrySet()) {
+ for (TScanRangeParams scanRangeParams : entry.getValue()) {
+
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
+ TFileFormatType.FORMAT_PROTO);
+
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
+ TFileCompressType.PLAIN);
+ }
+ }
+ List<TScanRangeParams> scanRangeParams =
tRequest.params.per_node_scan_ranges.values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+ Preconditions.checkState(scanRangeParams.size() == 1);
+ loadId = queryId;
+ // see BackendServiceProxy#execPlanFragmentsAsync
+ paramsList = new TExecPlanFragmentParamsList();
+ paramsList.addToParamsList(tRequest);
+ execPlanFragmentParamsBytes = ByteString.copyFrom(new
TSerializer().serialize(paramsList));
+ }
+
+ public Future<PGroupCommitInsertResponse>
executeGroupCommitInsert(ConnectContext ctx,
+ List<InternalService.PDataRow> rows) throws TException,
DdlException, RpcException {
+ backend = ctx.getInsertGroupCommit(this.table.getId());
+ if (backend == null || !backend.isAlive()) {
+ List<Long> allBackendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
+ if (allBackendIds.isEmpty()) {
+ throw new DdlException("No alive backend");
+ }
+ Collections.shuffle(allBackendIds);
+ backend =
Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0));
+ ctx.setInsertGroupCommit(this.table.getId(), backend);
+ }
+ PGroupCommitInsertRequest request =
PGroupCommitInsertRequest.newBuilder()
+ .setDbId(db.getId())
+ .setTableId(table.getId())
+ .setBaseSchemaVersion(table.getBaseSchemaVersion())
+
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
+ .setRequest(execPlanFragmentParamsBytes)
+
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build())
+
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
+ .build()).addAllData(rows)
+ .build();
+ Future<PGroupCommitInsertResponse> future =
BackendServiceProxy.getInstance()
+ .groupCommitInsert(new TNetworkAddress(backend.getHost(),
backend.getBrpcPort()), request);
+ return future;
+ }
+
+ // only for nereids use
+ public static InternalService.PDataRow getRowStringValue(List<Expr> cols,
int filterSize) throws UserException {
+ if (cols.isEmpty()) {
+ return null;
+ }
+ InternalService.PDataRow.Builder row =
InternalService.PDataRow.newBuilder();
+ try {
+ List<Expr> exprs = cols.subList(0, cols.size() - filterSize);
+ for (Expr expr : exprs) {
+ if (!expr.isLiteralOrCastExpr() && !(expr instanceof
CastExpr)) {
+ if (expr.getChildren().get(0) instanceof NullLiteral) {
+ row.addColBuilder().setValue("\\N");
+ continue;
+ }
+ throw new UserException(
+ "do not support non-literal expr in transactional
insert operation: " + expr.toSql());
+ }
+ if (expr instanceof NullLiteral) {
+ row.addColBuilder().setValue("\\N");
+ } else if (expr.getType() instanceof ArrayType) {
+
row.addColBuilder().setValue(expr.getStringValueForArray());
+ } else if (!expr.getChildren().isEmpty()) {
+ expr.getChildren().forEach(child -> processExprVal(child,
row));
+ } else {
+ row.addColBuilder().setValue(expr.getStringValue());
+ }
+ }
+ } catch (UserException e) {
+ throw new RuntimeException(e);
+ }
+ return row.build();
+ }
+
+ private static void processExprVal(Expr expr,
InternalService.PDataRow.Builder row) {
+ if (expr.getChildren().isEmpty()) {
+ row.addColBuilder().setValue(expr.getStringValue());
+ return;
+ }
+ for (Expr child : expr.getChildren()) {
+ processExprVal(child, row);
+ }
+ }
+
+ public Backend getBackend() {
+ return backend;
+ }
+
+ public TExecPlanFragmentParamsList getParamsList() {
+ return paramsList;
+ }
+
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 5656ce9d4ff..5fa8a8205cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -126,13 +126,13 @@ import
org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.Forward;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OriginalPlanner;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
-import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
@@ -145,19 +145,16 @@ import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueOfferToken;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
-import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
-import org.apache.doris.system.Backend;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
import org.apache.doris.thrift.TLoadTxnBeginResult;
import org.apache.doris.thrift.TMergeType;
-import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TResultBatch;
@@ -1828,19 +1825,9 @@ public class StmtExecutor {
txnId = context.getTxnEntry().getTxnConf().getTxnId();
} else if (insertStmt instanceof NativeInsertStmt &&
((NativeInsertStmt) insertStmt).isGroupCommit()) {
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
- Backend backend =
context.getInsertGroupCommit(insertStmt.getTargetTable().getId());
- if (backend == null || !backend.isAlive()) {
- List<Long> allBackendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (allBackendIds.isEmpty()) {
- throw new DdlException("No alive backend");
- }
- Collections.shuffle(allBackendIds);
- backend =
Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0));
-
context.setInsertGroupCommit(insertStmt.getTargetTable().getId(), backend);
- }
int maxRetry = 3;
for (int i = 0; i < maxRetry; i++) {
- nativeInsertStmt.planForGroupCommit(context.queryId);
+ GroupCommitPlanner groupCommitPlanner =
nativeInsertStmt.planForGroupCommit(context.queryId);
// handle rows
List<InternalService.PDataRow> rows = new ArrayList<>();
SelectStmt selectStmt = (SelectStmt) insertStmt.getQueryStmt();
@@ -1861,23 +1848,15 @@ public class StmtExecutor {
InternalService.PDataRow data =
getRowStringValue(exprList);
rows.add(data);
}
- TUniqueId loadId = nativeInsertStmt.getLoadId();
- PGroupCommitInsertRequest request =
PGroupCommitInsertRequest.newBuilder()
-
.setDbId(insertStmt.getTargetTable().getDatabase().getId())
- .setTableId(insertStmt.getTargetTable().getId())
-
.setBaseSchemaVersion(nativeInsertStmt.getBaseSchemaVersion())
-
.setExecPlanFragmentRequest(nativeInsertStmt.getExecPlanFragmentRequest())
-
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
- .build()).addAllData(rows)
- .build();
- Future<PGroupCommitInsertResponse> future =
BackendServiceProxy.getInstance()
- .groupCommitInsert(new
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+ Future<PGroupCommitInsertResponse> future = groupCommitPlanner
+ .executeGroupCommitInsert(context, rows);
PGroupCommitInsertResponse response = future.get();
TStatusCode code =
TStatusCode.findByValue(response.getStatus().getStatusCode());
if (code == TStatusCode.DATA_QUALITY_ERROR) {
LOG.info("group commit insert failed. stmt: {}, backend
id: {}, status: {}, "
+ "schema version: {}, retry: {}",
insertStmt.getOrigStmt().originStmt,
- backend.getId(), response.getStatus(),
nativeInsertStmt.getBaseSchemaVersion(), i);
+ groupCommitPlanner.getBackend().getId(),
+ response.getStatus(),
nativeInsertStmt.getBaseSchemaVersion(), i);
if (i < maxRetry) {
List<TableIf> tables =
Lists.newArrayList(insertStmt.getTargetTable());
MetaLockUtils.readLockTables(tables);
@@ -1890,12 +1869,12 @@ public class StmtExecutor {
}
continue;
} else {
- errMsg = "group commit insert failed. backend id: " +
backend.getId() + ", status: "
- + response.getStatus();
+ errMsg = "group commit insert failed. backend id: "
+ + groupCommitPlanner.getBackend().getId() + ",
status: " + response.getStatus();
}
} else if (code != TStatusCode.OK) {
- errMsg = "group commit insert failed. backend id: " +
backend.getId() + ", status: "
- + response.getStatus();
+ errMsg = "group commit insert failed. backend id: " +
groupCommitPlanner.getBackend().getId()
+ + ", status: " + response.getStatus();
ErrorReport.reportDdlException(errMsg,
ErrorCode.ERR_FAILED_WHEN_INSERT);
}
label = response.getLabel();
diff --git a/regression-test/data/insert_p0/insert_group_commit_into.out
b/regression-test/data/insert_p0/insert_group_commit_into.out
index 97fc1897552..dfb3a3b41c0 100644
--- a/regression-test/data/insert_p0/insert_group_commit_into.out
+++ b/regression-test/data/insert_p0/insert_group_commit_into.out
@@ -94,3 +94,98 @@ q 50
-- !sql --
0 service_46da0dab-e27d-4820-aea2-9bfc15741615 1697032066304 0
3229b7cd-f3a2-4359-aa24-946388c9cc54 0
CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm
[...]
+-- !sql --
+1 a 10
+2 b -1
+3 c -1
+4 \N -1
+5 q 50
+6 \N -1
+
+-- !sql --
+1 a 10
+1 a 10
+2 b -1
+2 b -1
+3 c -1
+3 c -1
+4 e1 -1
+5 q 50
+5 q 50
+6 \N -1
+6 \N -1
+
+-- !sql --
+1 a \N 10
+1 a \N 10
+1 a \N 10
+2 b \N -1
+2 b \N -1
+2 b \N -1
+3 c \N -1
+3 c \N -1
+3 c \N -1
+4 \N \N -1
+4 e1 \N -1
+5 q \N 50
+5 q \N 50
+5 q \N 50
+6 \N \N -1
+6 \N \N -1
+6 \N \N -1
+
+-- !sql --
+2 b \N -1
+6 \N \N -1
+
+-- !sql --
+1 a 10 5
+2 b -1 \N
+2 b -1 \N
+3 c -1 \N
+4 \N -1 \N
+5 q 50 6
+6 \N -1 \N
+6 \N -1 \N
+
+-- !sql --
+1 a 10
+1 a 10
+2 b -1
+2 b -1
+2 b -1
+3 c -1
+3 c -1
+4 \N -1
+4 \N -1
+5 q 50
+5 q 50
+6 \N -1
+6 \N -1
+6 \N -1
+
+-- !sql --
+\N -1
+\N -1
+\N -1
+\N -1
+\N -1
+\N -1
+\N -1
+a 10
+a 10
+a 10
+b -1
+b -1
+b -1
+b -1
+c -1
+c -1
+c -1
+q 50
+q 50
+q 50
+
+-- !sql --
+0 service_46da0dab-e27d-4820-aea2-9bfc15741615 1697032066304 0
3229b7cd-f3a2-4359-aa24-946388c9cc54 0
CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVm
[...]
+
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index 26cc9ca9776..bea130d1067 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -77,173 +77,193 @@ suite("insert_group_commit_into") {
assertTrue(serverInfo.contains("'status':'VISIBLE'"))
assertTrue(!serverInfo.contains("'label':'group_commit_"))
}
+ for (item in ["legacy", "nereids"]) {
+ try {
+ // create table
+ sql """ drop table if exists ${table}; """
- try {
- // create table
- sql """ drop table if exists ${table}; """
-
- sql """
- CREATE TABLE ${table} (
- `id` int(11) NOT NULL,
- `name` varchar(50) NULL,
- `score` int(11) NULL default "-1"
- ) ENGINE=OLAP
- DUPLICATE KEY(`id`, `name`)
- PARTITION BY RANGE(id)
- (
- FROM (1) TO (100) INTERVAL 10
- )
- DISTRIBUTED BY HASH(`id`) BUCKETS 1
- PROPERTIES (
- "replication_num" = "1"
- );
- """
-
- connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
- sql """ set enable_insert_group_commit = true; """
- // TODO
- sql """ set enable_nereids_dml = false; """
-
- // 1. insert into
- group_commit_insert """ insert into ${table}(name, id) values('c',
3); """, 1
- group_commit_insert """ insert into ${table}(id) values(4); """, 1
- group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50); """, 2
- group_commit_insert """ insert into ${table}(id, name) values(2,
'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
- getRowCount(6)
- qt_sql """ select * from ${table} order by id, name, score asc; """
-
- // 2. insert into and delete
- sql """ delete from ${table} where id = 4; """
- group_commit_insert """ insert into ${table}(name, id) values('c',
3); """, 1
- /*sql """ insert into ${table}(id, name) values(4, 'd1'); """
- sql """ insert into ${table}(id, name) values(4, 'd1'); """
- sql """ delete from ${table} where id = 4; """*/
- group_commit_insert """ insert into ${table}(id, name) values(4,
'e1'); """, 1
- group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50); """, 2
- group_commit_insert """ insert into ${table}(id, name) values(2,
'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
- getRowCount(11)
- qt_sql """ select * from ${table} order by id, name, score asc; """
-
- // 3. insert into and light schema change: add column
- group_commit_insert """ insert into ${table}(name, id) values('c',
3); """, 1
- group_commit_insert """ insert into ${table}(id) values(4); """, 1
- group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50); """, 2
- sql """ alter table ${table} ADD column age int after name; """
- group_commit_insert """ insert into ${table}(id, name) values(2,
'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
- assertTrue(getAlterTableState(), "add column should success")
- getRowCount(17)
- qt_sql """ select * from ${table} order by id, name,score asc; """
-
- // 4. insert into and truncate table
- /*sql """ insert into ${table}(name, id) values('c', 3); """
- sql """ insert into ${table}(id) values(4); """
- sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6,
50); """*/
- sql """ truncate table ${table}; """
- group_commit_insert """ insert into ${table}(id, name) values(2,
'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
- getRowCount(2)
- qt_sql """ select * from ${table} order by id, name, score asc; """
-
- // 5. insert into and schema change: modify column order
- group_commit_insert """ insert into ${table}(name, id) values('c',
3); """, 1
- group_commit_insert """ insert into ${table}(id) values(4); """, 1
- group_commit_insert """ insert into ${table} values (1, 'a', 5,
10),(5, 'q', 6, 50); """, 2
- // sql """ alter table ${table} order by (id, name, score, age);
"""
- group_commit_insert """ insert into ${table}(id, name) values(2,
'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
- // assertTrue(getAlterTableState(), "modify column order should
success")
- getRowCount(8)
- qt_sql """ select id, name, score, age from ${table} order by id,
name, score asc; """
-
- // 6. insert into and light schema change: drop column
- group_commit_insert """ insert into ${table}(name, id) values('c',
3); """, 1
- group_commit_insert """ insert into ${table}(id) values(4); """, 1
- group_commit_insert """ insert into ${table} values (1, 'a', 5,
10),(5, 'q', 6, 50); """, 2
- sql """ alter table ${table} DROP column age; """
- group_commit_insert """ insert into ${table}(id, name) values(2,
'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
- assertTrue(getAlterTableState(), "drop column should success")
- getRowCount(14)
- qt_sql """ select * from ${table} order by id, name, score asc; """
-
- // 7. insert into and add rollup
- group_commit_insert """ insert into ${table}(name, id) values('c',
3); """, 1
- group_commit_insert """ insert into ${table}(id) values(4); """, 1
- group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50),(101, 'a', 100); """, 2
- // sql """ alter table ${table} ADD ROLLUP r1(name, score); """
- group_commit_insert """ insert into ${table}(id, name) values(2,
'b'); """, 1
- group_commit_insert """ insert into ${table}(id) select 6; """, 1
-
- getRowCount(20)
- qt_sql """ select name, score from ${table} order by name asc; """
-
- none_group_commit_insert """ insert into ${table}(id, name, score)
values(10 + 1, 'h', 100); """, 1
- none_group_commit_insert """ insert into ${table}(id, name, score)
select 10 + 2, 'h', 100; """, 1
- none_group_commit_insert """ insert into ${table} with label
test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13,
'h', 100); """, 1
- def rowCount = sql "select count(*) from ${table}"
- logger.info("row count: " + rowCount)
- assertEquals(rowCount[0][0], 23)
+ sql """
+ CREATE TABLE ${table} (
+ `id` int(11) NOT NULL,
+ `name` varchar(50) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ PARTITION BY RANGE(id)
+ (
+ FROM (1) TO (100) INTERVAL 10
+ )
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+ sql """ set enable_insert_group_commit = true; """
+ if (item == "nereids") {
+ sql """ set enable_nereids_dml = true; """
+ sql """ set enable_nereids_planner=true; """
+ sql """ set enable_fallback_to_original_planner=false; """
+ } else {
+ sql """ set enable_nereids_dml = false; """
+ }
+
+ // 1. insert into
+ group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
+ group_commit_insert """ insert into ${table}(id) values(4);
""", 1
+ group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50); """, 2
+ group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
+ group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+
+ getRowCount(6)
+ qt_sql """ select * from ${table} order by id, name, score
asc; """
+
+ // 2. insert into and delete
+ sql """ delete from ${table} where id = 4; """
+ group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
+ /*sql """ insert into ${table}(id, name) values(4, 'd1'); """
+ sql """ insert into ${table}(id, name) values(4, 'd1'); """
+ sql """ delete from ${table} where id = 4; """*/
+ group_commit_insert """ insert into ${table}(id, name)
values(4, 'e1'); """, 1
+ group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50); """, 2
+ group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
+ group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+
+ getRowCount(11)
+ qt_sql """ select * from ${table} order by id, name, score
asc; """
+
+ // 3. insert into and light schema change: add column
+ group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
+ group_commit_insert """ insert into ${table}(id) values(4);
""", 1
+ group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50); """, 2
+ sql """ alter table ${table} ADD column age int after name; """
+ group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
+ group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+
+ assertTrue(getAlterTableState(), "add column should success")
+ getRowCount(17)
+ qt_sql """ select * from ${table} order by id, name,score asc;
"""
+
+ // 4. insert into and truncate table
+ /*sql """ insert into ${table}(name, id) values('c', 3); """
+ sql """ insert into ${table}(id) values(4); """
+ sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q',
6, 50); """*/
+ sql """ truncate table ${table}; """
+ group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
+ group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+
+ getRowCount(2)
+ qt_sql """ select * from ${table} order by id, name, score
asc; """
+
+ // 5. insert into and schema change: modify column order
+ group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
+ group_commit_insert """ insert into ${table}(id) values(4);
""", 1
+ group_commit_insert """ insert into ${table} values (1, 'a',
5, 10),(5, 'q', 6, 50); """, 2
+ // sql """ alter table ${table} order by (id, name, score,
age); """
+ group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
+ group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+
+ // assertTrue(getAlterTableState(), "modify column order
should success")
+ getRowCount(8)
+ qt_sql """ select id, name, score, age from ${table} order by
id, name, score asc; """
+
+ // 6. insert into and light schema change: drop column
+ group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
+ group_commit_insert """ insert into ${table}(id) values(4);
""", 1
+ group_commit_insert """ insert into ${table} values (1, 'a',
5, 10),(5, 'q', 6, 50); """, 2
+ sql """ alter table ${table} DROP column age; """
+ group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
+ group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+
+ assertTrue(getAlterTableState(), "drop column should success")
+ getRowCount(14)
+ qt_sql """ select * from ${table} order by id, name, score
asc; """
+
+ // 7. insert into and add rollup
+ group_commit_insert """ insert into ${table}(name, id)
values('c', 3); """, 1
+ group_commit_insert """ insert into ${table}(id) values(4);
""", 1
+ group_commit_insert """ insert into ${table} values (1, 'a',
10),(5, 'q', 50),(101, 'a', 100); """, 2
+ // sql """ alter table ${table} ADD ROLLUP r1(name, score); """
+ group_commit_insert """ insert into ${table}(id, name)
values(2, 'b'); """, 1
+ group_commit_insert """ insert into ${table}(id) select 6;
""", 1
+
+ getRowCount(20)
+ qt_sql """ select name, score from ${table} order by name asc;
"""
+
+
+ if (item == "nereids") {
+ group_commit_insert """ insert into ${table}(id, name,
score) values(10 + 1, 'h', 100); """, 1
+ group_commit_insert """ insert into ${table}(id, name,
score) select 10 + 2, 'h', 100; """, 1
+ group_commit_insert """ insert into ${table} with label
test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13,
'h', 100); """, 1
+ getRowCount(23)
+ } else {
+ none_group_commit_insert """ insert into ${table}(id,
name, score) values(10 + 1, 'h', 100); """, 1
+ none_group_commit_insert """ insert into ${table}(id,
name, score) select 10 + 2, 'h', 100; """, 1
+ none_group_commit_insert """ insert into ${table} with
label test_gc_""" + System.currentTimeMillis() + """ (id, name, score)
values(13, 'h', 100); """, 1
+ }
+
+ def rowCount = sql "select count(*) from ${table}"
+ logger.info("row count: " + rowCount)
+ assertEquals(rowCount[0][0], 23)
+ }
+ } finally {
+ // try_sql("DROP TABLE ${table}")
}
- } finally {
- // try_sql("DROP TABLE ${table}")
- }
- // table with array type
- tableName = "insert_group_commit_into_duplicate_array"
- table = dbName + "." + tableName
- try {
- // create table
- sql """ drop table if exists ${table}; """
-
- sql """
- CREATE table ${table} (
- teamID varchar(255),
- service_id varchar(255),
- start_time BigInt,
- time_bucket BigInt ,
- segment_id String ,
- trace_id String ,
- data_binary String ,
- end_time BigInt ,
- endpoint_id String ,
- endpoint_name String ,
- is_error Boolean ,
- latency Int ,
- service_instance_id String ,
- statement String ,
- tags Array<String>
- ) UNIQUE key (`teamID`,`service_id`, `start_time`)
- DISTRIBUTED BY hash(`start_time`)
- BUCKETS 1
- PROPERTIES ("replication_allocation" = "tag.location.default: 1")
- """
-
- connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
- sql """ set enable_insert_group_commit = true; """
- // TODO
- sql """ set enable_nereids_dml = false; """
-
- // 1. insert into
- group_commit_insert """
- INSERT INTO ${table} (`data_binary`, `end_time`, `endpoint_id`,
`endpoint_name`, `is_error`, `latency`, `segment_id`, `service_id`,
`service_instance_id`, `start_time`, `statement`, `tags`, `teamID`,
`time_bucket`, `trace_id`)
- VALUES
-
('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2
[...]
- 1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411',
'355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3,
'3229b7cd-f3a2-4359-aa24-946388c9cc54',
'service_46da0dab-e27d-4820-aea2-9bfc15741615',
'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304,
'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5,
tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16,
tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_11=t
[...]
- """, 1
-
- getRowCount(1)
- qt_sql """ select * from ${table}; """
+ // table with array type
+ tableName = "insert_group_commit_into_duplicate_array"
+ table = dbName + "." + tableName
+ try {
+ // create table
+ sql """ drop table if exists ${table}; """
+
+ sql """
+ CREATE table ${table} (
+ teamID varchar(255),
+ service_id varchar(255),
+ start_time BigInt,
+ time_bucket BigInt ,
+ segment_id String ,
+ trace_id String ,
+ data_binary String ,
+ end_time BigInt ,
+ endpoint_id String ,
+ endpoint_name String ,
+ is_error Boolean ,
+ latency Int ,
+ service_instance_id String ,
+ statement String ,
+ tags Array<String>
+ ) UNIQUE key (`teamID`,`service_id`, `start_time`)
+ DISTRIBUTED BY hash(`start_time`)
+ BUCKETS 1
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+ """
+
+ connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+ sql """ set enable_insert_group_commit = true; """
+ if (item == "nereids") {
+ sql """ set enable_nereids_dml = true; """
+ sql """ set enable_nereids_planner=true; """
+ sql """ set enable_fallback_to_original_planner=false; """
+ } else {
+ sql """ set enable_nereids_dml = false; """
+ }
+
+ // 1. insert into
+ group_commit_insert """
+ INSERT INTO ${table} (`data_binary`, `end_time`,
`endpoint_id`, `endpoint_name`, `is_error`, `latency`, `segment_id`,
`service_id`, `service_instance_id`, `start_time`, `statement`, `tags`,
`teamID`, `time_bucket`, `trace_id`)
+ VALUES
+
('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdG
[...]
+ 1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411',
'355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3,
'3229b7cd-f3a2-4359-aa24-946388c9cc54',
'service_46da0dab-e27d-4820-aea2-9bfc15741615',
'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304,
'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5,
tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16,
tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_ [...]
+ """, 1
+
+ getRowCount(1)
+ qt_sql """ select * from ${table}; """
+ }
+ } finally {
+ // try_sql("DROP TABLE ${table}")
}
- } finally {
- // try_sql("DROP TABLE ${table}")
}
}
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
index 3eeaeb797f2..b2c2edb204d 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
@@ -52,231 +52,269 @@ suite("insert_group_commit_with_exception") {
}
return false
}
-
- try {
- // create table
- sql """ drop table if exists ${table}; """
-
- sql """
- CREATE TABLE `${table}` (
- `id` int(11) NOT NULL,
- `name` varchar(1100) NULL,
- `score` int(11) NULL default "-1"
- ) ENGINE=OLAP
- DUPLICATE KEY(`id`, `name`)
- DISTRIBUTED BY HASH(`id`) BUCKETS 1
- PROPERTIES (
- "replication_num" = "1"
- );
- """
-
- sql """ set enable_insert_group_commit = true; """
- // TODO
- sql """ set enable_nereids_dml = false; """
-
- // insert into without column
- try {
- def result = sql """ insert into ${table} values(1, 'a', 10, 100)
"""
- assertTrue(false)
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
- }
-
- try {
- def result = sql """ insert into ${table} values(2, 'b') """
- assertTrue(false)
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
- }
-
- result = sql """ insert into ${table} values(3, 'c', 30) """
- logger.info("insert result: " + result)
-
- // insert into with column
- result = sql """ insert into ${table}(id, name) values(4, 'd') """
- logger.info("insert result: " + result)
-
- getRowCount(2)
-
- try {
- result = sql """ insert into ${table}(id, name) values(5, 'd', 50)
"""
- assertTrue(false)
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
- }
-
- try {
- result = sql """ insert into ${table}(id, name) values(6) """
- assertTrue(false)
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
- }
-
+ for (item in ["legacy", "nereids"]) {
try {
- result = sql """ insert into ${table}(id, names) values(7, 'd')
"""
- assertTrue(false)
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("Unknown column 'names'"))
- }
-
+ // create table
+ sql """ drop table if exists ${table}; """
+
+ sql """
+ CREATE TABLE `${table}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(1100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ sql """ set enable_insert_group_commit = true; """
+ if (item == "nereids") {
+ sql """ set enable_nereids_dml = true; """
+ sql """ set enable_nereids_planner=true; """
+ sql """ set enable_fallback_to_original_planner=false; """
+ } else {
+ sql """ set enable_nereids_dml = false; """
+ }
- // prepare insert
- def db = context.config.defaultDb + "_insert_p0"
- String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
-
- try (Connection connection = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword)) {
- Statement statement = connection.createStatement();
- statement.execute("use ${db}");
- statement.execute("set enable_insert_group_commit = true;");
- // without column
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table} values(?, ?, ?, ?)")) {
- ps.setObject(1, 8);
- ps.setObject(2, "f");
- ps.setObject(3, 70);
- ps.setObject(4, "a");
- ps.addBatch();
- int[] result = ps.executeBatch();
+ // insert into without column
+ try {
+ def result = sql """ insert into ${table} values(1, 'a', 10,
100) """
assertTrue(false)
} catch (Exception e) {
- assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
+ if (item == "nereids") {
+ assertTrue(e.getMessage().contains("insert into cols
should be corresponding to the query output"))
+ } else {
+ assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
+ }
}
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table} values(?, ?)")) {
- ps.setObject(1, 9);
- ps.setObject(2, "f");
- ps.addBatch();
- int[] result = ps.executeBatch();
+ try {
+ def result = sql """ insert into ${table} values(2, 'b') """
assertTrue(false)
} catch (Exception e) {
- assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
+ if (item == "nereids") {
+ assertTrue(e.getMessage().contains("insert into cols
should be corresponding to the query output"))
+ } else {
+ assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
+ }
}
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table} values(?, ?, ?)")) {
- ps.setObject(1, 10);
- ps.setObject(2, "f");
- ps.setObject(3, 90);
- ps.addBatch();
- int[] result = ps.executeBatch();
- logger.info("prepare insert result: " + result)
- }
+ result = sql """ insert into ${table} values(3, 'c', 30) """
+ logger.info("insert result: " + result)
- // with columns
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table}(id, name) values(?, ?)")) {
- ps.setObject(1, 11);
- ps.setObject(2, "f");
- ps.addBatch();
- int[] result = ps.executeBatch();
- logger.info("prepare insert result: " + result)
- }
+ // insert into with column
+ result = sql """ insert into ${table}(id, name) values(4, 'd') """
+ logger.info("insert result: " + result)
+
+ getRowCount(2)
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table}(id, name) values(?, ?, ?)")) {
- ps.setObject(1, 12);
- ps.setObject(2, "f");
- ps.setObject(3, "f");
- ps.addBatch();
- int[] result = ps.executeBatch();
+ try {
+ result = sql """ insert into ${table}(id, name) values(5, 'd',
50) """
assertTrue(false)
} catch (Exception e) {
- assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
+ if (item == "nereids") {
+ assertTrue(e.getMessage().contains("insert into cols
should be corresponding to the query output"))
+ } else {
+ assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
+ }
}
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table}(id, name) values(?)")) {
- ps.setObject(1, 13);
- ps.addBatch();
- int[] result = ps.executeBatch();
+ try {
+ result = sql """ insert into ${table}(id, name) values(6) """
assertTrue(false)
} catch (Exception e) {
- assertTrue(e.getMessage().contains("Column count doesn't match
value count"))
+ if (item == "nereids") {
+ assertTrue(e.getMessage().contains("insert into cols
should be corresponding to the query output"))
+ } else {
+ assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
+ }
}
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table}(id, names) values(?, ?)")) {
- ps.setObject(1, 12);
- ps.setObject(2, "f");
- ps.addBatch();
- int[] result = ps.executeBatch();
+ try {
+ result = sql """ insert into ${table}(id, names) values(7,
'd') """
assertTrue(false)
} catch (Exception e) {
- assertTrue(e.getMessage().contains("Unknown column 'names'"))
+ if (item == "nereids") {
+ assertTrue(e.getMessage().contains("column names is not
found in table"))
+ } else {
+ assertTrue(e.getMessage().contains("Unknown column
'names'"))
+ }
}
- getRowCount(4)
- // prepare insert with multi rows
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table} values(?, ?, ?)")) {
- for (int i = 0; i < 5; i++) {
- ps.setObject(1, 13 + i);
+ // prepare insert
+ def db = context.config.defaultDb + "_insert_p0"
+ String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
+
+ if (item == "nereids") {
+ println("nereids does not support prepare insert");
+ continue;
+ };
+
+ try (Connection connection = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword)) {
+ Statement statement = connection.createStatement();
+ statement.execute("use ${db}");
+ statement.execute("set enable_insert_group_commit = true;");
+ if (item == "nereids") {
+ statement.execute("set enable_nereids_dml = true;");
+ statement.execute("set enable_nereids_planner=true;");
+ statement.execute("set
enable_fallback_to_original_planner=false;");
+ } else {
+ statement.execute("set enable_nereids_dml = false;");
+ }
+ // without column
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table} values(?, ?, ?, ?)")) {
+ ps.setObject(1, 8);
+ ps.setObject(2, "f");
+ ps.setObject(3, 70);
+ ps.setObject(4, "a");
+ ps.addBatch();
+ int[] result = ps.executeBatch();
+ assertTrue(false)
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
+ }
+
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table} values(?, ?)")) {
+ ps.setObject(1, 9);
+ ps.setObject(2, "f");
+ ps.addBatch();
+ int[] result = ps.executeBatch();
+ assertTrue(false)
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
+ }
+
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table} values(?, ?, ?)")) {
+ ps.setObject(1, 10);
ps.setObject(2, "f");
ps.setObject(3, 90);
ps.addBatch();
int[] result = ps.executeBatch();
logger.info("prepare insert result: " + result)
}
- }
- getRowCount(9)
- // prepare insert with multi rows
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table} values(?, ?, ?),(?, ?, ?)")) {
- for (int i = 0; i < 2; i++) {
- ps.setObject(1, 18 + i);
+ // with columns
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table}(id, name) values(?, ?)")) {
+ ps.setObject(1, 11);
ps.setObject(2, "f");
- ps.setObject(3, 90);
- ps.setObject(4, 18 + i + 1);
- ps.setObject(5, "f");
- ps.setObject(6, 90);
ps.addBatch();
int[] result = ps.executeBatch();
logger.info("prepare insert result: " + result)
}
- }
- getRowCount(13)
-
- // prepare insert without column names, and do schema change
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table} values(?, ?, ?)")) {
- ps.setObject(1, 22)
- ps.setObject(2, "f")
- ps.setObject(3, 90)
- ps.addBatch()
- int[] result = ps.executeBatch()
- logger.info("prepare insert result: " + result)
-
- sql """ alter table ${table} ADD column age int after name; """
- assertTrue(getAlterTableState(), "add column should success")
-
- try {
- ps.setObject(1, 23)
+
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table}(id, name) values(?, ?, ?)")) {
+ ps.setObject(1, 12);
+ ps.setObject(2, "f");
+ ps.setObject(3, "f");
+ ps.addBatch();
+ int[] result = ps.executeBatch();
+ assertTrue(false)
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
+ }
+
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table}(id, name) values(?)")) {
+ ps.setObject(1, 13);
+ ps.addBatch();
+ int[] result = ps.executeBatch();
+ assertTrue(false)
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
+ }
+
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table}(id, names) values(?, ?)")) {
+ ps.setObject(1, 12);
+ ps.setObject(2, "f");
+ ps.addBatch();
+ int[] result = ps.executeBatch();
+ assertTrue(false)
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Unknown column
'names'"))
+ }
+
+ getRowCount(4)
+
+ // prepare insert with multi rows
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table} values(?, ?, ?)")) {
+ for (int i = 0; i < 5; i++) {
+ ps.setObject(1, 13 + i);
+ ps.setObject(2, "f");
+ ps.setObject(3, 90);
+ ps.addBatch();
+ int[] result = ps.executeBatch();
+ logger.info("prepare insert result: " + result)
+ }
+ }
+ getRowCount(9)
+
+ // prepare insert with multi rows
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table} values(?, ?, ?),(?, ?, ?)")) {
+ for (int i = 0; i < 2; i++) {
+ ps.setObject(1, 18 + i);
+ ps.setObject(2, "f");
+ ps.setObject(3, 90);
+ ps.setObject(4, 18 + i + 1);
+ ps.setObject(5, "f");
+ ps.setObject(6, 90);
+ ps.addBatch();
+ int[] result = ps.executeBatch();
+ logger.info("prepare insert result: " + result)
+ }
+ }
+ getRowCount(13)
+
+ // prepare insert without column names, and do schema change
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table} values(?, ?, ?)")) {
+ ps.setObject(1, 22)
+ ps.setObject(2, "f")
+ ps.setObject(3, 90)
+ ps.addBatch()
+ int[] result = ps.executeBatch()
+ logger.info("prepare insert result: " + result)
+
+ sql """ alter table ${table} ADD column age int after
name; """
+ assertTrue(getAlterTableState(), "add column should
success")
+
+ try {
+ ps.setObject(1, 23)
+ ps.setObject(2, "f")
+ ps.setObject(3, 90)
+ ps.addBatch()
+ result = ps.executeBatch()
+ assertTrue(false)
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Column count
doesn't match value count"))
+ }
+ }
+ getRowCount(14)
+
+ // prepare insert with column names, and do schema change
+ try (PreparedStatement ps =
connection.prepareStatement("insert into ${table}(id, name, score) values(?, ?,
?)")) {
+ ps.setObject(1, 24)
+ ps.setObject(2, "f")
+ ps.setObject(3, 90)
+ ps.addBatch()
+ int[] result = ps.executeBatch()
+ logger.info("prepare insert result: " + result)
+
+ sql """ alter table ${table} DROP column age; """
+ assertTrue(getAlterTableState(), "drop column should
success")
+
+ ps.setObject(1, 25)
ps.setObject(2, "f")
ps.setObject(3, 90)
ps.addBatch()
result = ps.executeBatch()
- assertTrue(false)
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("Column count doesn't
match value count"))
+ logger.info("prepare insert result: " + result)
}
+ getRowCount(16)
}
- getRowCount(14)
-
- // prepare insert with column names, and do schema change
- try (PreparedStatement ps = connection.prepareStatement("insert
into ${table}(id, name, score) values(?, ?, ?)")) {
- ps.setObject(1, 24)
- ps.setObject(2, "f")
- ps.setObject(3, 90)
- ps.addBatch()
- int[] result = ps.executeBatch()
- logger.info("prepare insert result: " + result)
-
- sql """ alter table ${table} DROP column age; """
- assertTrue(getAlterTableState(), "drop column should success")
-
- ps.setObject(1, 25)
- ps.setObject(2, "f")
- ps.setObject(3, 90)
- ps.addBatch()
- result = ps.executeBatch()
- logger.info("prepare insert result: " + result)
- }
- getRowCount(16)
+ } finally {
+ // try_sql("DROP TABLE ${table}")
}
- } finally {
- // try_sql("DROP TABLE ${table}")
}
}
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
index 1805905a2a1..c28b18d3797 100644
---
a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
+++
b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
@@ -48,52 +48,59 @@ suite("insert_group_commit_with_large_data") {
assertTrue(serverInfo.contains("'label':'group_commit_"))
}
- try {
- // create table
- sql """ drop table if exists ${table}; """
+ for (item in ["legacy", "nereids"]) {
+ try {
+ // create table
+ sql """ drop table if exists ${table}; """
- sql """
- CREATE TABLE `${table}` (
- `id` int(11) NOT NULL,
- `name` varchar(1100) NULL,
- `score` int(11) NULL default "-1"
- ) ENGINE=OLAP
- DUPLICATE KEY(`id`, `name`)
- DISTRIBUTED BY HASH(`id`) BUCKETS 1
- PROPERTIES (
- "replication_num" = "1"
- );
- """
+ sql """
+ CREATE TABLE `${table}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(1100) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
- connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
- sql """ set enable_insert_group_commit = true; """
- // TODO
- sql """ set enable_nereids_dml = false; """
- sql """ use ${db}; """
+ connect(user = context.config.jdbcUser, password =
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+ sql """ set enable_insert_group_commit = true; """
+ if (item == "nereids") {
+ sql """ set enable_nereids_dml = true; """
+ sql """ set enable_nereids_planner=true; """
+ sql """ set enable_fallback_to_original_planner=false; """
+ } else {
+ sql """ set enable_nereids_dml = false; """
+ }
+ sql """ use ${db}; """
- // insert into 5000 rows
- def insert_sql = """ insert into ${table} values(1, 'a', 10) """
- for (def i in 2..5000) {
- insert_sql += """, (${i}, 'a', 10) """
- }
- group_commit_insert insert_sql, 5000
- getRowCount(5000)
+ // insert into 5000 rows
+ def insert_sql = """ insert into ${table} values(1, 'a', 10)
"""
+ for (def i in 2..5000) {
+ insert_sql += """, (${i}, 'a', 10) """
+ }
+ group_commit_insert insert_sql, 5000
+ getRowCount(5000)
- // data size is large than 4MB, need " set global
max_allowed_packet = 5508950 "
- /*def name_value = ""
- for (def i in 0..1024) {
- name_value += 'a'
- }
- insert_sql = """ insert into ${table} values(1, '${name_value}',
10) """
- for (def i in 2..5000) {
- insert_sql += """, (${i}, '${name_value}', 10) """
+ // data size is large than 4MB, need " set global
max_allowed_packet = 5508950 "
+ /*def name_value = ""
+ for (def i in 0..1024) {
+ name_value += 'a'
+ }
+ insert_sql = """ insert into ${table} values(1,
'${name_value}', 10) """
+ for (def i in 2..5000) {
+ insert_sql += """, (${i}, '${name_value}', 10) """
+ }
+ result = sql """ ${insert_sql} """
+ group_commit_insert insert_sql, 5000
+ getRowCount(10000)
+ */
}
- result = sql """ ${insert_sql} """
- group_commit_insert insert_sql, 5000
- getRowCount(10000)
- */
+ } finally {
+ // try_sql("DROP TABLE ${table}")
}
- } finally {
- // try_sql("DROP TABLE ${table}")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]