This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8d693cdcb90 [fix](group commit) fix group commit use prepared
statement and connect to observer (#46206) (#46344)
8d693cdcb90 is described below
commit 8d693cdcb90c83bb3ac7a8db07439bb4de03c39c
Author: meiyi <[email protected]>
AuthorDate: Fri Jan 3 15:48:47 2025 +0800
[fix](group commit) fix group commit use prepared statement and connect to
observer (#46206) (#46344)
pick https://github.com/apache/doris/pull/46206
---
.../commands/insert/InsertIntoTableCommand.java | 10 ++++++++++
.../insert/OlapGroupCommitInsertExecutor.java | 21 +++++++++++++++++++--
.../main/java/org/apache/doris/qe/StmtExecutor.java | 2 ++
.../org/apache/doris/regression/suite/Suite.groovy | 6 +++++-
.../insert_group_commit_with_exception.groovy | 4 ++--
.../insert_group_commit_with_prepare_stmt.groovy | 2 +-
6 files changed, 39 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 17c84580d15..023b205ac53 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
+import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
@@ -331,6 +332,15 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
return StmtType.INSERT;
}
+ @Override
+ public RedirectStatus toRedirectStatus() {
+ if (ConnectContext.get().isGroupCommit()) {
+ return RedirectStatus.NO_FORWARD;
+ } else {
+ return RedirectStatus.FORWARD_WITH_SYNC;
+ }
+ }
+
private static class BuildInsertExecutorResult {
private final NereidsPlanner planner;
private final AbstractInsertExecutor executor;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
index 239328ce93d..ed13d5dcb23 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -33,6 +33,7 @@ import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.planner.GroupCommitPlanner;
@@ -63,6 +64,20 @@ public class OlapGroupCommitInsertExecutor extends
OlapInsertExecutor {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}
+ /**
+ * check if the sql can run in group commit mode
+ * @param logicalPlan plan of sql
+ */
+ public static void analyzeGroupCommit(LogicalPlan logicalPlan) {
+ ConnectContext ctx = ConnectContext.get();
+ if (ctx.getSessionVariable().isEnableInsertGroupCommit() &&
logicalPlan instanceof InsertIntoTableCommand) {
+ LogicalPlan logicalQuery = ((InsertIntoTableCommand)
logicalPlan).getLogicalQuery();
+ TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery,
ctx);
+ OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx,
targetTableIf, logicalQuery,
+ Optional.empty());
+ }
+ }
+
protected static void analyzeGroupCommit(ConnectContext ctx, TableIf
table, LogicalPlan logicalQuery,
Optional<InsertCommandContext> insertCtx) {
// The flag is set to false before execute sql, if it is true, this is
a http stream
@@ -91,8 +106,10 @@ public class OlapGroupCommitInsertExecutor extends
OlapInsertExecutor {
conditions.add(Pair.of(() -> !(insertCtx.isPresent() &&
insertCtx.get() instanceof OlapInsertCommandContext
&& ((OlapInsertCommandContext)
insertCtx.get()).isOverwrite()), () -> "is overwrite command"));
conditions.add(Pair.of(
- () -> tableSink.child() instanceof OneRowRelation ||
tableSink.child() instanceof LogicalUnion,
- () -> "not one row relation or union, class: " +
tableSink.child().getClass().getName()));
+ () -> tableSink.child() instanceof OneRowRelation ||
tableSink.child() instanceof LogicalUnion
+ || tableSink.child() instanceof LogicalInlineTable,
+ () -> "not one row relation or union or inline table,
class: " + tableSink.child().getClass()
+ .getName()));
ctx.setGroupCommit(conditions.stream().allMatch(p ->
p.first.getAsBoolean()));
if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) {
for (Pair<BooleanSupplier, Supplier<String>> pair :
conditions) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 7ae5994e431..41d5ba98e6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -160,6 +160,7 @@ import
org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
import
org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
+import
org.apache.doris.nereids.trees.plans.commands.insert.OlapGroupCommitInsertExecutor;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
@@ -723,6 +724,7 @@ public class StmtExecutor {
}
if (logicalPlan instanceof Command) {
if (logicalPlan instanceof Forward) {
+ OlapGroupCommitInsertExecutor.analyzeGroupCommit(logicalPlan);
redirectStatus = ((Forward) logicalPlan).toRedirectStatus();
if (isForwardToMaster()) {
// before forward to master, we also need to set
profileType in this node
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index a7b8421635e..333720bc5b9 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1361,8 +1361,12 @@ class Suite implements GroovyInterceptable {
}
String getServerPrepareJdbcUrl(String jdbcUrl, String database) {
+ return getServerPrepareJdbcUrl(jdbcUrl, database, true)
+ }
+
+ String getServerPrepareJdbcUrl(String jdbcUrl, String database, boolean
useMasterIp) {
String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
- def sql_ip = getMasterIp()
+ def sql_ip = useMasterIp ? getMasterIp() :
urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
def sql_port
if (urlWithoutSchema.indexOf("/") >= 0) {
// e.g: jdbc:mysql://locahost:8080/?a=b
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
index 054add11d9f..f14b28a7509 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy
@@ -111,12 +111,12 @@ suite("insert_group_commit_with_exception") {
// prepare insert
def db = context.config.defaultDb + "_insert_p0"
- String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db)
+ String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db, false)
try (Connection connection = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword)) {
Statement statement = connection.createStatement();
statement.execute("use ${db}");
- statement.execute("set group_commit = eventual_consistency;");
+ statement.execute("set group_commit = sync_mode");
statement.execute("set enable_server_side_prepared_statement =
true")
// without column
try (PreparedStatement ps = connection.prepareStatement("insert
into ${table} values(?, ?, ?, ?)")) {
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
index 7f2919f8118..e93e157aa5d 100644
---
a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
+++
b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy
@@ -127,7 +127,7 @@ suite("insert_group_commit_with_prepare_stmt") {
return serverStatementIds
}
- def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb)
+ def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb, false)
logger.info("url: " + url)
def result1 = connect(user, password, url +
"&sessionVariables=group_commit=async_mode") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]