This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new cd4b8666e74 branch-3.0: [fix](group commit) fix some group commit
problem (#48621) (#48896)
cd4b8666e74 is described below
commit cd4b8666e7483c2ba2be24081cfd2e8b7b34a32a
Author: meiyi <[email protected]>
AuthorDate: Tue Mar 11 17:34:48 2025 +0800
branch-3.0: [fix](group commit) fix some group commit problem (#48621)
(#48896)
pick https://github.com/apache/doris/pull/48621
---
.../insert/OlapGroupCommitInsertExecutor.java | 12 +++--
.../insert_group_commit_with_large_data.out | Bin 221 -> 383 bytes
.../ddl_p0/test_create_table_properties.groovy | 3 +-
regression-test/suites/ddl_p0/test_ctas.groovy | 2 +-
.../insert_group_commit_with_large_data.groovy | 54 +++++++++++++++++++--
5 files changed, 61 insertions(+), 10 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
index 0387f308b15..b90a616cd7f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java
@@ -32,6 +32,7 @@ import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
@@ -131,11 +132,14 @@ public class OlapGroupCommitInsertExecutor extends
OlapInsertExecutor {
() -> "not allowModifyMTMVData"));
conditions.add(Pair.of(() -> !(insertCtx.isPresent() &&
insertCtx.get() instanceof OlapInsertCommandContext
&& ((OlapInsertCommandContext)
insertCtx.get()).isOverwrite()), () -> "is overwrite command"));
+ Plan tableSinkChild = tableSink.child();
conditions.add(Pair.of(
- () -> tableSink.child() instanceof OneRowRelation ||
tableSink.child() instanceof LogicalUnion
- || tableSink.child() instanceof LogicalInlineTable,
- () -> "not one row relation or union or inline table,
class: " + tableSink.child().getClass()
- .getName()));
+ () -> tableSinkChild instanceof OneRowRelation ||
(tableSinkChild instanceof LogicalUnion
+ && tableSinkChild.getExpressions().size() > 0)
+ || tableSinkChild instanceof LogicalInlineTable,
+ () -> "should be one row relation or union or inline
table, class: "
+ + tableSinkChild.getClass().getName() +
(tableSinkChild instanceof LogicalUnion
+ ? ", expression size is 0" : "")));
ctx.setGroupCommit(conditions.stream().allMatch(p ->
p.first.getAsBoolean()));
if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) {
for (Pair<BooleanSupplier, Supplier<String>> pair :
conditions) {
diff --git
a/regression-test/data/insert_p0/insert_group_commit_with_large_data.out
b/regression-test/data/insert_p0/insert_group_commit_with_large_data.out
index 06acc23ec64..5e28e927c1c 100644
Binary files
a/regression-test/data/insert_p0/insert_group_commit_with_large_data.out and
b/regression-test/data/insert_p0/insert_group_commit_with_large_data.out differ
diff --git a/regression-test/suites/ddl_p0/test_create_table_properties.groovy
b/regression-test/suites/ddl_p0/test_create_table_properties.groovy
index 32fd0cabcaf..4a64ac55bcb 100644
--- a/regression-test/suites/ddl_p0/test_create_table_properties.groovy
+++ b/regression-test/suites/ddl_p0/test_create_table_properties.groovy
@@ -300,7 +300,8 @@ suite("test_create_table_properties") {
assertTrue(false, "should not be able to execute")
}
catch (Exception ex) {
- assertTrue(ex.getMessage().contains("Insert has filtered data in
strict mode"))
+ def exception_str = isGroupCommitMode() ? "too many filtered rows" :
"Insert has filtered data in strict mode"
+ assertTrue(ex.getMessage().contains(exception_str))
} finally {
}
// alter table add default partition
diff --git a/regression-test/suites/ddl_p0/test_ctas.groovy
b/regression-test/suites/ddl_p0/test_ctas.groovy
index 3dc0afd73af..6d176e30b4f 100644
--- a/regression-test/suites/ddl_p0/test_ctas.groovy
+++ b/regression-test/suites/ddl_p0/test_ctas.groovy
@@ -106,7 +106,7 @@ suite("test_ctas") {
test {
sql """show load from ${dbname}"""
- rowNum 6
+ rowNum isGroupCommitMode() ? 4: 6
}
sql """
diff --git
a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
index 39773f887cb..d43ad340f87 100644
---
a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
+++
b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy
@@ -101,9 +101,10 @@ suite("insert_group_commit_with_large_data") {
sql """ drop table if exists ${testTable}; """
sql """create table ${testTable}(a int,b int,c double generated always as
(abs(a+b)) not null)
DISTRIBUTED BY HASH(a) PROPERTIES("replication_num" = "1",
"group_commit_interval_ms" = "40");"""
- sql "INSERT INTO ${testTable} values(6,7,default);"
- sql "INSERT INTO ${testTable}(a,b) values(1,2);"
- sql "INSERT INTO ${testTable} values(3,5,default);"
+ sql " set group_commit = async_mode; "
+ group_commit_insert "INSERT INTO ${testTable} values(6,7,default);", 1
+ group_commit_insert "INSERT INTO ${testTable}(a,b) values(1,2);", 1
+ group_commit_insert "INSERT INTO ${testTable} values(3,5,default);", 1
getRowCount(3)
qt_select1 "select * from ${testTable} order by 1,2,3;"
@@ -123,7 +124,6 @@ suite("insert_group_commit_with_large_data") {
if (exception != null) {
throw exception
}
- log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(4, json.NumberTotalRows)
@@ -131,4 +131,50 @@ suite("insert_group_commit_with_large_data") {
}
getRowCount(7)
qt_select2 "select * from ${testTable} order by 1,2,3;"
+
+ try {
+ sql """set group_commit = off_mode;"""
+ sql "drop table if exists gc_ctas1"
+ sql "drop table if exists gc_ctas2"
+ sql "drop table if exists gc_ctas3"
+ sql '''
+ CREATE TABLE IF NOT EXISTS `gc_ctas1` (
+ `k1` varchar(5) NULL,
+ `k2` varchar(5) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ '''
+ sql '''
+ CREATE TABLE IF NOT EXISTS `gc_ctas2` (
+ `k1` varchar(10) NULL,
+ `k2` varchar(10) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ '''
+ sql ''' insert into gc_ctas1 values('11111','11111'); '''
+ sql ''' insert into gc_ctas2 values('1111111111','1111111111'); '''
+ sql "sync"
+ order_qt_select_cte1 """ select * from gc_ctas1; """
+ order_qt_select_cte2 """ select * from gc_ctas2; """
+ sql """set group_commit = async_mode;"""
+ sql '''
+ create table `gc_ctas3`(k1, k2)
+ PROPERTIES("replication_num" = "1")
+ as select * from gc_ctas1
+ union all
+ select * from gc_ctas2;
+ '''
+ sql " insert into gc_ctas3 select * from gc_ctas1 union all select *
from gc_ctas2;"
+ sql "sync"
+ order_qt_select_cte3 """ select * from gc_ctas3; """
+ } finally {
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]