This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7087250b4a1 [fix](insert) txn insert and group commit should write \N
string corr… (#27637)
7087250b4a1 is described below
commit 7087250b4a14fc33f4f55ad696eecdb15344ee55
Author: meiyi <[email protected]>
AuthorDate: Tue Nov 28 17:32:50 2023 +0800
[fix](insert) txn insert and group commit should write \N string corr…
(#27637)
---
.../plans/commands/InsertIntoTableCommand.java | 4 +-
.../apache/doris/planner/GroupCommitPlanner.java | 104 ++++++++++++++-------
.../java/org/apache/doris/qe/StmtExecutor.java | 62 ++----------
.../data/insert_p0/insert_with_null.out | 53 +++++++++++
.../suites/insert_p0/insert_with_null.groovy | 97 +++++++++++++++++++
5 files changed, 226 insertions(+), 94 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index ceb2ad9270a..b6a632cb24f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -81,7 +81,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
/**
* insert into select command implementation
@@ -400,8 +399,7 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
}
GroupCommitPlanner groupCommitPlanner = new
GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), null, ctx.queryId());
- Future<PGroupCommitInsertResponse> future =
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
- PGroupCommitInsertResponse response = future.get();
+ PGroupCommitInsertResponse response =
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
TStatusCode code =
TStatusCode.findByValue(response.getStatus().getStatusCode());
if (code == TStatusCode.DATA_QUALITY_ERROR) {
LOG.info("group commit insert failed. query id: {}, backend id:
{}, status: {}, "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index 15aa639abee..5dcfdc1ba44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -17,10 +17,12 @@
package org.apache.doris.planner;
-
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.NativeInsertStmt;
import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.SelectStmt;
+import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@@ -33,6 +35,7 @@ import
org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
@@ -55,10 +58,12 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@@ -78,6 +83,11 @@ public class GroupCommitPlanner {
throws UserException, TException {
this.db = db;
this.table = table;
+ if
(Env.getCurrentEnv().getGroupCommitManager().isBlock(this.table.getId())) {
+ String msg = "insert table " + this.table.getId() + " is blocked
on schema change";
+ LOG.info(msg);
+ throw new DdlException(msg);
+ }
TStreamLoadPutRequest streamLoadPutRequest = new
TStreamLoadPutRequest();
if (targetColumnNames != null) {
streamLoadPutRequest.setColumns(String.join(",",
targetColumnNames));
@@ -91,7 +101,7 @@ public class GroupCommitPlanner {
.setTbl(table.getName())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
- .setGroupCommit(true);
+ .setGroupCommit(true).setTrimDoubleQuotes(true);
StreamLoadTask streamLoadTask =
StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
StreamLoadPlanner planner = new StreamLoadPlanner(db, table,
streamLoadTask);
// Will using load id as query id in fragment
@@ -114,17 +124,29 @@ public class GroupCommitPlanner {
execPlanFragmentParamsBytes = ByteString.copyFrom(new
TSerializer().serialize(paramsList));
}
- public Future<PGroupCommitInsertResponse>
executeGroupCommitInsert(ConnectContext ctx,
- List<InternalService.PDataRow> rows) throws TException,
DdlException, RpcException {
+ public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext
ctx,
+ List<InternalService.PDataRow> rows)
+ throws DdlException, RpcException, ExecutionException,
InterruptedException {
backend = ctx.getInsertGroupCommit(this.table.getId());
- if (backend == null || !backend.isAlive()) {
+ if (backend == null || !backend.isAlive() ||
backend.isDecommissioned()) {
List<Long> allBackendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
if (allBackendIds.isEmpty()) {
throw new DdlException("No alive backend");
}
Collections.shuffle(allBackendIds);
- backend =
Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0));
- ctx.setInsertGroupCommit(this.table.getId(), backend);
+ boolean find = false;
+ for (Long beId : allBackendIds) {
+ backend = Env.getCurrentSystemInfo().getBackend(beId);
+ if (!backend.isDecommissioned()) {
+ ctx.setInsertGroupCommit(this.table.getId(), backend);
+ find = true;
+ LOG.debug("choose new be {}", backend.getId());
+ break;
+ }
+ }
+ if (!find) {
+ throw new DdlException("No suitable backend");
+ }
}
PGroupCommitInsertRequest request =
PGroupCommitInsertRequest.newBuilder()
.setDbId(db.getId())
@@ -138,7 +160,7 @@ public class GroupCommitPlanner {
.build();
Future<PGroupCommitInsertResponse> future =
BackendServiceProxy.getInstance()
.groupCommitInsert(new TNetworkAddress(backend.getHost(),
backend.getBrpcPort()), request);
- return future;
+ return future.get();
}
// only for nereids use
@@ -147,40 +169,30 @@ public class GroupCommitPlanner {
return null;
}
InternalService.PDataRow.Builder row =
InternalService.PDataRow.newBuilder();
- try {
- List<Expr> exprs = cols.subList(0, cols.size() - filterSize);
- for (Expr expr : exprs) {
- if (!expr.isLiteralOrCastExpr() && !(expr instanceof
CastExpr)) {
- if (expr.getChildren().get(0) instanceof NullLiteral) {
- row.addColBuilder().setValue("\\N");
- continue;
- }
- throw new UserException(
- "do not support non-literal expr in transactional
insert operation: " + expr.toSql());
- }
- if (expr instanceof NullLiteral) {
- row.addColBuilder().setValue("\\N");
- } else if (expr.getType() instanceof ArrayType) {
-
row.addColBuilder().setValue(expr.getStringValueForArray());
- } else if (!expr.getChildren().isEmpty()) {
- expr.getChildren().forEach(child -> processExprVal(child,
row));
- } else {
- row.addColBuilder().setValue(expr.getStringValue());
+ List<Expr> exprs = cols.subList(0, cols.size() - filterSize);
+ for (Expr expr : exprs) {
+ if (!expr.isLiteralOrCastExpr() && !(expr instanceof CastExpr)) {
+ if (expr.getChildren().get(0) instanceof NullLiteral) {
+
row.addColBuilder().setValue(StmtExecutor.NULL_VALUE_FOR_LOAD);
+ continue;
}
+ throw new UserException(
+ "do not support non-literal expr in transactional
insert operation: " + expr.toSql());
}
- } catch (UserException e) {
- throw new RuntimeException(e);
+ processExprVal(expr, row);
}
return row.build();
}
private static void processExprVal(Expr expr,
InternalService.PDataRow.Builder row) {
- if (expr.getChildren().isEmpty()) {
- row.addColBuilder().setValue(expr.getStringValue());
- return;
- }
- for (Expr child : expr.getChildren()) {
- processExprVal(child, row);
+ if (expr instanceof NullLiteral) {
+ row.addColBuilder().setValue(StmtExecutor.NULL_VALUE_FOR_LOAD);
+ } else if (expr.getType() instanceof ArrayType) {
+ row.addColBuilder().setValue(String.format("\"%s\"",
expr.getStringValueForArray()));
+ } else if (!expr.getChildren().isEmpty()) {
+ expr.getChildren().forEach(child -> processExprVal(child, row));
+ } else {
+ row.addColBuilder().setValue(String.format("\"%s\"",
expr.getStringValue()));
}
}
@@ -192,5 +204,27 @@ public class GroupCommitPlanner {
return paramsList;
}
+ public List<InternalService.PDataRow> getRows(NativeInsertStmt stmt)
throws UserException {
+ List<InternalService.PDataRow> rows = new ArrayList<>();
+ SelectStmt selectStmt = (SelectStmt) (stmt.getQueryStmt());
+ if (selectStmt.getValueList() != null) {
+ for (List<Expr> row : selectStmt.getValueList().getRows()) {
+ InternalService.PDataRow data =
StmtExecutor.getRowStringValue(row);
+ rows.add(data);
+ }
+ } else {
+ List<Expr> exprList = new ArrayList<>();
+ for (Expr resultExpr : selectStmt.getResultExprs()) {
+ if (resultExpr instanceof SlotRef) {
+ exprList.add(((SlotRef)
resultExpr).getDesc().getSourceExprs().get(0));
+ } else {
+ exprList.add(resultExpr);
+ }
+ }
+ InternalService.PDataRow data =
StmtExecutor.getRowStringValue(exprList);
+ rows.add(data);
+ }
+ return rows;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 8340e3b13e5..237708183cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -59,7 +59,6 @@ import org.apache.doris.analysis.SetStmt;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.SetVar.SetVarType;
import org.apache.doris.analysis.ShowStmt;
-import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
@@ -151,7 +150,6 @@ import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
-import org.apache.doris.system.Backend;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
@@ -194,7 +192,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -279,7 +276,7 @@ public class StmtExecutor {
this.profile = new Profile("Query",
context.getSessionVariable().enableProfile());
}
- private static InternalService.PDataRow getRowStringValue(List<Expr> cols)
throws UserException {
+ public static InternalService.PDataRow getRowStringValue(List<Expr> cols)
throws UserException {
if (cols.isEmpty()) {
return null;
}
@@ -292,9 +289,9 @@ public class StmtExecutor {
if (expr instanceof NullLiteral) {
row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
} else if (expr instanceof ArrayLiteral) {
- row.addColBuilder().setValue(expr.getStringValueForArray());
+ row.addColBuilder().setValue(String.format("\"%s\"",
expr.getStringValueForArray()));
} else {
- row.addColBuilder().setValue(expr.getStringValue());
+ row.addColBuilder().setValue(String.format("\"%s\"",
expr.getStringValue()));
}
}
return row.build();
@@ -1830,7 +1827,7 @@ public class StmtExecutor {
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId())
.setExecMemLimit(maxExecMemByte).setTimeout((int)
timeoutSecond)
-
.setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism);
+
.setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism).setTrimDoubleQuotes(true);
if (parsedStmt instanceof NativeInsertStmt && ((NativeInsertStmt)
parsedStmt).getTargetColumnNames() != null) {
List<String> targetColumnNames = ((NativeInsertStmt)
parsedStmt).getTargetColumnNames();
if (targetColumnNames.contains(Column.SEQUENCE_COL) ||
targetColumnNames.contains(Column.DELETE_SIGN)) {
@@ -1896,59 +1893,12 @@ public class StmtExecutor {
txnId = context.getTxnEntry().getTxnConf().getTxnId();
} else if (insertStmt instanceof NativeInsertStmt &&
((NativeInsertStmt) insertStmt).isGroupCommit()) {
isGroupCommit = true;
- if
(Env.getCurrentEnv().getGroupCommitManager().isBlock(insertStmt.getTargetTable().getId()))
{
- String msg = "insert table " +
insertStmt.getTargetTable().getId() + " is blocked on schema change";
- LOG.info(msg);
- throw new DdlException(msg);
- }
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt;
- Backend backend =
context.getInsertGroupCommit(insertStmt.getTargetTable().getId());
- if (backend == null || !backend.isAlive() ||
backend.isDecommissioned()) {
- List<Long> allBackendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
- if (allBackendIds.isEmpty()) {
- throw new DdlException("No alive backend");
- }
- Collections.shuffle(allBackendIds);
- boolean find = false;
- for (Long beId : allBackendIds) {
- backend = Env.getCurrentSystemInfo().getBackend(beId);
- if (!backend.isDecommissioned()) {
-
context.setInsertGroupCommit(insertStmt.getTargetTable().getId(), backend);
- find = true;
- LOG.debug("choose new be {}", backend.getId());
- break;
- }
- }
- if (!find) {
- throw new DdlException("No suitable backend");
- }
- }
int maxRetry = 3;
for (int i = 0; i < maxRetry; i++) {
GroupCommitPlanner groupCommitPlanner =
nativeInsertStmt.planForGroupCommit(context.queryId);
- // handle rows
- List<InternalService.PDataRow> rows = new ArrayList<>();
- SelectStmt selectStmt = (SelectStmt) insertStmt.getQueryStmt();
- if (selectStmt.getValueList() != null) {
- for (List<Expr> row : selectStmt.getValueList().getRows())
{
- InternalService.PDataRow data = getRowStringValue(row);
- rows.add(data);
- }
- } else {
- List<Expr> exprList = new ArrayList<>();
- for (Expr resultExpr : selectStmt.getResultExprs()) {
- if (resultExpr instanceof SlotRef) {
- exprList.add(((SlotRef)
resultExpr).getDesc().getSourceExprs().get(0));
- } else {
- exprList.add(resultExpr);
- }
- }
- InternalService.PDataRow data =
getRowStringValue(exprList);
- rows.add(data);
- }
- Future<PGroupCommitInsertResponse> future = groupCommitPlanner
- .executeGroupCommitInsert(context, rows);
- PGroupCommitInsertResponse response = future.get();
+ List<InternalService.PDataRow> rows =
groupCommitPlanner.getRows(nativeInsertStmt);
+ PGroupCommitInsertResponse response =
groupCommitPlanner.executeGroupCommitInsert(context, rows);
TStatusCode code =
TStatusCode.findByValue(response.getStatus().getStatusCode());
if (code == TStatusCode.DATA_QUALITY_ERROR) {
LOG.info("group commit insert failed. stmt: {}, backend
id: {}, status: {}, "
diff --git a/regression-test/data/insert_p0/insert_with_null.out
b/regression-test/data/insert_p0/insert_with_null.out
new file mode 100644
index 00000000000..58a7e423197
--- /dev/null
+++ b/regression-test/data/insert_p0/insert_with_null.out
@@ -0,0 +1,53 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 "b" ["k1=v1, k2=v2"]
+2 N ["k3=v3, k4=v4"]
+4 null []
+5 NULL ["k5, k6"]
+6 \N ["k7", "k8"]
+7 abc []
+
+-- !sql --
+6 \N ["k7", "k8"]
+
+-- !sql --
+
+-- !sql --
+1 "b" ["k1=v1, k2=v2"]
+2 N ["k3=v3, k4=v4"]
+4 null []
+5 NULL ["k5, k6"]
+6 \N ["k7", "k8"]
+
+-- !sql --
+6 \N ["k7", "k8"]
+
+-- !sql --
+
+-- !sql --
+1 "b" ["k1=v1, k2=v2"]
+2 N ["k3=v3, k4=v4"]
+4 null []
+5 NULL ["k5, k6"]
+6 \N ["k7", "k8"]
+7 abc []
+
+-- !sql --
+6 \N ["k7", "k8"]
+
+-- !sql --
+
+-- !sql --
+1 "b" ["k1=v1, k2=v2"]
+2 N ["k3=v3, k4=v4"]
+4 null []
+5 NULL ["k5, k6"]
+6 \N ["k7", "k8"]
+7 abc \N
+
+-- !sql --
+6 \N ["k7", "k8"]
+
+-- !sql --
+7 abc \N
+
diff --git a/regression-test/suites/insert_p0/insert_with_null.groovy
b/regression-test/suites/insert_p0/insert_with_null.groovy
new file mode 100644
index 00000000000..1ac8f46b8c6
--- /dev/null
+++ b/regression-test/suites/insert_p0/insert_with_null.groovy
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
+// and modified by Doris.
+
+suite("insert_with_null") {
+ def table = "insert_with_null"
+ sql """ DROP TABLE IF EXISTS $table """
+ sql """
+ CREATE TABLE ${table} (
+ `id` int(11) NOT NULL,
+ `name` varchar(50) NULL,
+ `score` int(11) NULL default "-1"
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ def getRowCount = { expectedRowCount ->
+ def retry = 0
+ while (retry < 30) {
+ sleep(2000)
+ def rowCount = sql "select count(*) from ${table}"
+ logger.info("rowCount: " + rowCount + ", retry: " + retry)
+ if (rowCount[0][0] >= expectedRowCount) {
+ break
+ }
+ retry++
+ }
+ }
+
+ def write_modes = ["insert", "txn_insert", "group_commit_legacy",
"group_commit_nereids"]
+
+ for (def write_mode : write_modes) {
+ sql """ DROP TABLE IF EXISTS ${table} """
+ sql """
+ CREATE TABLE ${table} (
+ `id` int(11) NOT NULL,
+ `name` varchar(50) NULL,
+ `desc` array<String> NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ if (write_mode == "txn_insert") {
+ sql "begin"
+ } else if (write_mode == "group_commit_legacy") {
+ sql """ set enable_insert_group_commit = true; """
+ sql """ set enable_nereids_dml = false; """
+ } else if (write_mode == "group_commit_nereids") {
+ sql """ set enable_insert_group_commit = true; """
+ sql """ set enable_nereids_dml = true; """
+ sql """ set enable_nereids_planner=true; """
+ sql """ set enable_fallback_to_original_planner=false; """
+ }
+
+ sql """ insert into ${table} values(1, '"b"', ["k1=v1, k2=v2"]); """
+ sql """ insert into ${table} values(2, "\\N", ['k3=v3, k4=v4']); """
+ // sql """ insert into ${table} values(3, "\\\\N", []); """
+ sql """ insert into ${table} values(4, 'null', []); """
+ sql """ insert into ${table} values(5, 'NULL', ['k5, k6']); """
+ sql """ insert into ${table} values(6, null, ["k7", "k8"]); """
+ if (write_mode != "txn_insert") {
+ sql """ insert into ${table}(id, name) values(7, 'abc'); """
+ getRowCount(6)
+ } else {
+ sql "commit"
+ getRowCount(5)
+ }
+
+ qt_sql """ select * from ${table} order by id asc; """
+ qt_sql """ select * from ${table} where name is null order by id asc;
"""
+ qt_sql """ select * from ${table} where `desc` is null order by id
asc; """
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]