This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 8093b4b3342 [branch-2.0-pick] "[Fix](partial-update) Correct the
alignment process when the table has sequence column and add cases #25346"
(#25789)
8093b4b3342 is described below
commit 8093b4b3342ebc3100ddc2480f131a998164d229
Author: bobhan1 <[email protected]>
AuthorDate: Mon Oct 23 23:35:21 2023 +0800
[branch-2.0-pick] "[Fix](partial-update) Correct the alignment process when
the table has sequence column and add cases #25346" (#25789)
---
be/src/olap/tablet.cpp | 27 ++-
.../test_partial_update_parallel.out | 21 ++
.../test_partial_update_parallel.groovy | 243 ++++++++++++++++++++-
3 files changed, 287 insertions(+), 4 deletions(-)
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 37bfedc3293..527a8b1c372 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2965,6 +2965,15 @@ Status
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
Version dummy_version(end_version + 1, end_version + 1);
auto rowset_schema = rowset->tablet_schema();
bool is_partial_update = rowset_writer &&
rowset_writer->is_partial_update();
+ bool have_input_seq_column = false;
+ if (is_partial_update && rowset_schema->has_sequence_col()) {
+ std::vector<uint32_t> including_cids =
+ rowset_writer->get_partial_update_info()->update_cids;
+ have_input_seq_column =
+ rowset_schema->has_sequence_col() &&
+ (std::find(including_cids.cbegin(), including_cids.cend(),
+ rowset_schema->sequence_col_idx()) !=
including_cids.cend());
+ }
// use for partial update
PartialUpdateReadPlan read_plan_ori;
PartialUpdateReadPlan read_plan_update;
@@ -3033,12 +3042,24 @@ Status
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
continue;
}
- // sequence id smaller than the previous one, so delete current row
- if (st.is<KEY_ALREADY_EXISTS>()) {
+ if (st.is<KEY_ALREADY_EXISTS>() && (!is_partial_update ||
have_input_seq_column)) {
+ // `st.is<KEY_ALREADY_EXISTS>()` means that there exists a row
with the same key and larger value
+ // in seqeunce column.
+ // - If the current load is not a partial update, we just
delete current row.
+ // - Otherwise, it means that we are doing the alignment
process in publish phase due to conflicts
+ // during concurrent partial updates. And there exists another
load which introduces a row with
+ // the same keys and larger sequence column value published
successfully after the commit phase
+ // of the current load.
+ // - If the columns we update include sequence column, we
should delete the current row becase the
+ // partial update on the current row has been
`overwritten` by the previous one with larger sequence
+ // column value.
+ // - Otherwise, we should combine the values of the
missing columns in the previous row and the values
+ // of the including columns in the current row into a
new row.
delete_bitmap->add({rowset_id, seg->id(),
DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
- } else if (is_partial_update && rowset_writer != nullptr) {
+ }
+ if (is_partial_update && rowset_writer != nullptr) {
// In publish version, record rows to be deleted for
concurrent update
// For example, if version 5 and 6 update a row, but version 6
only see
// version 4 when write, and when publish version, version 5's
value will
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
index bcd7e86c53c..f4a51133a81 100644
---
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
+++
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_parallel.out
@@ -6,3 +6,24 @@
4 "bbbbbbbb" 4444 499 40
5 "cccccccccccc" 5555 599 50
+-- !sql --
+1 "ddddddddddd" 1111 199 10
+2 "eeeeee" 2222 299 20
+3 "aaaaa" 3333 399 30
+4 "bbbbbbbb" 4444 499 40
+5 "cccccccccccc" 5555 599 50
+
+-- !sql --
+1 "ddddddddddd" 1111 199 10 0 5 10
+2 "eeeeee" 2222 299 20 0 5 20
+3 "aaaaa" 3333 399 30 0 5 30
+4 "bbbbbbbb" 4444 499 40 0 5 40
+5 "cccccccccccc" 5555 599 50 0 5 50
+
+-- !sql --
+1 "ddddddddddd" 1111 199 10 0 5 10
+2 "eeeeee" 2222 299 20 0 5 20
+3 "aaaaa" 3333 399 30 0 5 30
+4 "bbbbbbbb" 4444 499 40 0 5 40
+5 "cccccccccccc" 5555 599 50 0 5 50
+
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
index 19522e8064e..ba0c1766aa1 100644
---
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy
@@ -17,7 +17,156 @@
suite("test_primary_key_partial_update_parallel", "p0") {
+ // case 1: concurrent partial update
def tableName = "test_primary_key_partial_update"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NOT NULL COMMENT "用户 ID",
+ `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+ `score` int(11) NOT NULL COMMENT "用户得分",
+ `test` int(11) NULL COMMENT "null test",
+ `dft` int(11) DEFAULT "4321")
+ UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES("replication_num" = "1",
"enable_unique_key_merge_on_write" = "true")
+ """
+
+ sql """insert into ${tableName} values
+ (2, "doris2", 2000, 223, 2),
+ (1, "doris", 1000, 123, 1),
+ (5, "doris5", 5000, 523, 5),
+ (4, "doris4", 4000, 423, 4),
+ (3, "doris3", 3000, 323, 3);"""
+
+ t1 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,name'
+
+ file 'partial_update_parallel1.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t2 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,score,test'
+
+ file 'partial_update_parallel2.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t3 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,dft'
+
+ file 'partial_update_parallel3.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t1.join()
+ t2.join()
+ t3.join()
+
+ sql "sync"
+
+ qt_sql """ select * from ${tableName} order by id;"""
+
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+
+ // case 2: concurrent partial update with row store column
+ tableName = "test_primary_key_row_store_partial_update"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NOT NULL COMMENT "用户 ID",
+ `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+ `score` int(11) NOT NULL COMMENT "用户得分",
+ `test` int(11) NULL COMMENT "null test",
+ `dft` int(11) DEFAULT "4321")
+ UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES("replication_num" = "1",
"enable_unique_key_merge_on_write" = "true", "store_row_column" = "true")
+ """
+
+ sql """insert into ${tableName} values
+ (2, "doris2", 2000, 223, 2),
+ (1, "doris", 1000, 123, 1),
+ (5, "doris5", 5000, 523, 5),
+ (4, "doris4", 4000, 423, 4),
+ (3, "doris3", 3000, 323, 3);"""
+
+ t1 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,name'
+
+ file 'partial_update_parallel1.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t2 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,score,test'
+
+ file 'partial_update_parallel2.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t3 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,dft'
+
+ file 'partial_update_parallel3.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t1.join()
+ t2.join()
+ t3.join()
+
+ sql "sync"
+
+ qt_sql """ select * from ${tableName} order by id;"""
+
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+
+ // case 3: concurrent partial update with sequence column
+ tableName = "test_primary_key_seq_partial_update"
// create table
sql """ DROP TABLE IF EXISTS ${tableName} """
@@ -29,14 +178,21 @@ suite("test_primary_key_partial_update_parallel", "p0") {
`test` int(11) NULL COMMENT "null test",
`dft` int(11) DEFAULT "4321")
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
- PROPERTIES("replication_num" = "1",
"enable_unique_key_merge_on_write" = "true")
+ PROPERTIES(
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "true",
+ "function_column.sequence_col" = "dft")
"""
sql """insert into ${tableName} values
+ (2, "deprecated", 99999, 999, 1),
(2, "doris2", 2000, 223, 2),
(1, "doris", 1000, 123, 1),
+ (3, "deprecated", 99999, 999, 2),
(5, "doris5", 5000, 523, 5),
(4, "doris4", 4000, 423, 4),
+ (4, "deprecated", 99999, 999, 3),
+ (4, "deprecated", 99999, 999, 1),
(3, "doris3", 3000, 323, 3);"""
t1 = Thread.startDaemon {
@@ -85,9 +241,94 @@ suite("test_primary_key_partial_update_parallel", "p0") {
t2.join()
t3.join()
+ sql "set show_hidden_columns=true;"
sql "sync"
qt_sql """ select * from ${tableName} order by id;"""
+ sql "set show_hidden_columns=false;"
+ sql "sync"
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+
+
+ // case 4: concurrent partial update with row store column and sequence
column
+ tableName = "test_primary_key_row_store_seq_partial_update"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NOT NULL COMMENT "用户 ID",
+ `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+ `score` int(11) NOT NULL COMMENT "用户得分",
+ `test` int(11) NULL COMMENT "null test",
+ `dft` int(11) DEFAULT "4321")
+ UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES(
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "true",
+ "function_column.sequence_col" = "dft",
+ "store_row_column" = "true")
+ """
+
+ sql """insert into ${tableName} values
+ (2, "deprecated", 99999, 999, 1),
+ (2, "doris2", 2000, 223, 2),
+ (1, "doris", 1000, 123, 1),
+ (3, "deprecated", 99999, 999, 2),
+ (5, "doris5", 5000, 523, 5),
+ (4, "doris4", 4000, 423, 4),
+ (4, "deprecated", 99999, 999, 3),
+ (4, "deprecated", 99999, 999, 1),
+ (3, "doris3", 3000, 323, 3);"""
+
+ t1 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,name'
+
+ file 'partial_update_parallel1.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t2 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,score,test'
+
+ file 'partial_update_parallel2.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t3 = Thread.startDaemon {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'id,dft'
+
+ file 'partial_update_parallel3.csv'
+ time 10000 // limit inflight 10s
+ }
+ }
+
+ t1.join()
+ t2.join()
+ t3.join()
+
+ sql "set show_hidden_columns=true;"
+ sql "sync"
+
+ qt_sql """ select
id,name,score,test,dft,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,__DORIS_SEQUENCE_COL__
from ${tableName} order by id;"""
sql """ DROP TABLE IF EXISTS ${tableName}; """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]