This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6f596586163 [fix](cluster key) fix cluster key duplicated key (#44776)
6f596586163 is described below
commit 6f596586163b5dedaf1b8e062c8e3ecd654cee22
Author: meiyi <[email protected]>
AuthorDate: Wed Dec 4 10:01:27 2024 +0800
[fix](cluster key) fix cluster key duplicated key (#44776)
the duplicated keys is happend when:
1. do schema change and not finished
2. write data and the publish phase skip calculate delete bitmap
3. compact the newly rowsets without delete bitmap: the mow with cluster
key need to do compaction with delete bitmap
---
be/src/cloud/cloud_cumulative_compaction.cpp | 2 -
.../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 2 +-
be/src/olap/compaction.cpp | 57 ++++++++
be/src/olap/compaction.h | 6 +
be/src/olap/cumulative_compaction.cpp | 2 +-
.../java/org/apache/doris/master/MasterImpl.java | 6 +-
.../test_schema_change_and_compaction.out | 8 ++
.../test_schema_change_and_compaction.groovy | 145 +++++++++++++++++++++
8 files changed, 221 insertions(+), 7 deletions(-)
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp
b/be/src/cloud/cloud_cumulative_compaction.cpp
index c466c35e2a2..4e25cd74209 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -375,11 +375,9 @@ Status CloudCumulativeCompaction::modify_rowsets() {
Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
// agg previously rowset old version delete bitmap
std::vector<RowsetSharedPtr> pre_rowsets {};
- std::vector<std::string> pre_rowset_ids {};
for (const auto& it : cloud_tablet()->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
- pre_rowset_ids.emplace_back(it.second->rowset_id().to_string());
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 91611d20c62..336117d1012 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -227,7 +227,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
}
}
auto total_update_delete_bitmap_time_us = MonotonicMicros() - t3;
- LOG(INFO) << "calculate delete bitmap successfully on tablet"
+ LOG(INFO) << "finish calculate delete bitmap on tablet"
<< ", table_id=" << tablet->table_id() << ", transaction_id=" <<
_transaction_id
<< ", tablet_id=" << tablet->tablet_id()
<< ", get_tablet_time_us=" << get_tablet_time_us
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index e71e1862dc8..c85ce36a103 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -191,6 +191,9 @@ Status Compaction::merge_input_rowsets() {
SCOPED_TIMER(_merge_rowsets_latency_timer);
// 1. Merge segment files and write bkd inverted index
if (_is_vertical) {
+ if (!_tablet->tablet_schema()->cluster_key_idxes().empty()) {
+ RETURN_IF_ERROR(update_delete_bitmap());
+ }
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(),
*_cur_tablet_schema,
input_rs_readers,
_output_rs_writer.get(),
get_avg_segment_rows(),
way_num, &_stats);
@@ -872,6 +875,60 @@ void
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
}
}
+Status CompactionMixin::update_delete_bitmap() {
+ // for mow with cluster keys, compaction read data with delete bitmap
+ // if tablet is not ready(such as schema change), we need to update delete
bitmap
+ {
+ std::shared_lock meta_rlock(_tablet->get_header_lock());
+ if (_tablet->tablet_state() != TABLET_NOTREADY) {
+ return Status::OK();
+ }
+ }
+ OlapStopWatch watch;
+ std::vector<RowsetSharedPtr> rowsets;
+ for (const auto& rowset : _input_rowsets) {
+ std::lock_guard rwlock(tablet()->get_rowset_update_lock());
+ std::shared_lock rlock(_tablet->get_header_lock());
+ Status st = _tablet->update_delete_bitmap_without_lock(_tablet,
rowset, &rowsets);
+ if (!st.ok()) {
+ LOG(INFO) << "failed update_delete_bitmap_without_lock for
tablet_id="
+ << _tablet->tablet_id() << ", st=" << st.to_string();
+ return st;
+ }
+ rowsets.push_back(rowset);
+ }
+ LOG(INFO) << "finish update delete bitmap for tablet: " <<
_tablet->tablet_id()
+ << ", rowsets: " << _input_rowsets.size() << ", cost: " <<
watch.get_elapse_time_us()
+ << "(us)";
+ return Status::OK();
+}
+
+Status CloudCompactionMixin::update_delete_bitmap() {
+ // for mow with cluster keys, compaction read data with delete bitmap
+ // if tablet is not ready(such as schema change), we need to update delete
bitmap
+ {
+ std::shared_lock meta_rlock(_tablet->get_header_lock());
+ if (_tablet->tablet_state() != TABLET_NOTREADY) {
+ return Status::OK();
+ }
+ }
+ OlapStopWatch watch;
+ std::vector<RowsetSharedPtr> rowsets;
+ for (const auto& rowset : _input_rowsets) {
+ Status st = _tablet->update_delete_bitmap_without_lock(_tablet,
rowset, &rowsets);
+ if (!st.ok()) {
+ LOG(INFO) << "failed update_delete_bitmap_without_lock for
tablet_id="
+ << _tablet->tablet_id() << ", st=" << st.to_string();
+ return st;
+ }
+ rowsets.push_back(rowset);
+ }
+ LOG(INFO) << "finish update delete bitmap for tablet: " <<
_tablet->tablet_id()
+ << ", rowsets: " << _input_rowsets.size() << ", cost: " <<
watch.get_elapse_time_us()
+ << "(us)";
+ return Status::OK();
+}
+
Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
ctx) {
// only do index compaction for dup_keys and unique_keys with mow enabled
if (config::inverted_index_compaction_enable &&
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 06ef4268529..7f92a6c5f4d 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -84,6 +84,8 @@ protected:
int64_t merge_way_num();
+ virtual Status update_delete_bitmap() = 0;
+
// the root tracker for this compaction
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
@@ -146,6 +148,8 @@ protected:
virtual Status modify_rowsets();
+ Status update_delete_bitmap() override;
+
StorageEngine& _engine;
private:
@@ -175,6 +179,8 @@ public:
protected:
CloudTablet* cloud_tablet() { return
static_cast<CloudTablet*>(_tablet.get()); }
+ Status update_delete_bitmap() override;
+
virtual void garbage_collection();
CloudStorageEngine& _engine;
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index b961c694ede..2dfd30fb86e 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -145,7 +145,7 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
DCHECK(missing_versions.size() % 2 == 0);
LOG(WARNING) << "There are missed versions among rowsets. "
<< "total missed version size: " <<
missing_versions.size() / 2
- << " first missed version prev rowset verison=" <<
missing_versions[0]
+ << ", first missed version prev rowset verison=" <<
missing_versions[0]
<< ", first missed version next rowset version=" <<
missing_versions[1]
<< ", tablet=" << _tablet->tablet_id();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 4010a9b564d..09318af34bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -681,9 +681,9 @@ public class MasterImpl {
CalcDeleteBitmapTask calcDeleteBitmapTask = (CalcDeleteBitmapTask)
task;
if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
calcDeleteBitmapTask.countDownToZero(request.getTaskStatus().getStatusCode(),
- "backend: " + task.getBackendId() + ",
error_tablet_size: "
- + request.getErrorTabletIdsSize() + ",
err_msg: "
- +
request.getTaskStatus().getErrorMsgs().toString());
+ "backend: " + task.getBackendId() + ",
error_tablet_size: " + request.getErrorTabletIdsSize()
+ + ", error_tablets: " +
request.getErrorTabletIds()
+ + ", err_msg: " +
request.getTaskStatus().getErrorMsgs().toString());
} else if (request.isSetRespPartitions()
&&
calcDeleteBitmapTask.isFinishRequestStale(request.getRespPartitions())) {
LOG.warn("get staled response from backend: {}, report
version: {}. calcDeleteBitmapTask's"
diff --git
a/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out
b/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out
new file mode 100644
index 00000000000..35f26a488be
--- /dev/null
+++
b/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select1 --
+10 20 35 40
+
+-- !select2 --
+10 20 40 37
+11 20 40 37
+
diff --git
a/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy
b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy
new file mode 100644
index 00000000000..dfb7facf5ee
--- /dev/null
+++
b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
+// and modified by Doris.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_schema_change_and_compaction", "nonConcurrent") {
+ def tableName = "test_schema_change_and_compaction"
+
+ def getAlterTableState = { job_state ->
+ def retry = 0
+ def last_state = ""
+ while (true) {
+ sleep(2000)
+ def state = sql " show alter table column where tablename =
'${tableName}' order by CreateTime desc limit 1"
+ logger.info("alter table state: ${state}")
+ last_state = state[0][9]
+ if (state.size() > 0 && state[0][9] == job_state) {
+ return
+ }
+ retry++
+ if (retry >= 10) {
+ break
+ }
+ }
+ assertTrue(false, "alter table job state is ${last_state}, not
${job_state} after retry ${retry} times")
+ }
+
+ def block_convert_historical_rowsets = {
+ if (isCloudMode()) {
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
+ } else {
+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block")
+ }
+ }
+
+ def unblock = {
+ if (isCloudMode()) {
+
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
+ } else {
+
GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block")
+ }
+ }
+
+ onFinish {
+ unblock()
+ }
+
+ sql """ DROP TABLE IF EXISTS ${tableName} force """
+ sql """
+ CREATE TABLE ${tableName} ( `k1` int(11), `k2` int(11), `v1` int(11),
`v2` int(11) ) ENGINE=OLAP
+ unique KEY(`k1`, `k2`) cluster by(v1) DISTRIBUTED BY HASH(`k1`)
BUCKETS 1
+ PROPERTIES ( "replication_num" = "1" );
+ """
+ sql """ insert into ${tableName} values(10, 20, 30, 40); """
+
+ // alter table
+ block_convert_historical_rowsets()
+ sql """ alter table ${tableName} order by(k1, k2, v2, v1); """
+ getAlterTableState("RUNNING")
+
+ def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+ logger.info("tablets: ${tablets}")
+ assertEquals(2, tablets.size())
+ String alterTabletId = ""
+ String alterTabletBackendId = ""
+ String alterTabletCompactionUrl = ""
+ for (Map<String, String> tablet : tablets) {
+ if (tablet["State"] == "ALTER") {
+ alterTabletId = tablet["TabletId"].toLong()
+ alterTabletBackendId = tablet["BackendId"]
+ alterTabletCompactionUrl = tablet["CompactionStatus"]
+ }
+ }
+ logger.info("alterTabletId: ${alterTabletId}, alterTabletBackendId:
${alterTabletBackendId}, alterTabletCompactionUrl: ${alterTabletCompactionUrl}")
+ assertTrue(!alterTabletId.isEmpty())
+
+ // write some data
+ sql """ insert into ${tableName} values(10, 20, 31, 40); """
+ sql """ insert into ${tableName} values(10, 20, 32, 40); """
+ sql """ insert into ${tableName} values(10, 20, 33, 40); """
+ sql """ insert into ${tableName} values(10, 20, 34, 40); """
+ sql """ insert into ${tableName} values(10, 20, 35, 40); """
+ order_qt_select1 """ select * from ${tableName}; """
+
+ // trigger compaction
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
+ logger.info("ip: " + backendId_to_backendIP.get(alterTabletBackendId) + ",
port: " + backendId_to_backendHttpPort.get(alterTabletBackendId))
+ def (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(alterTabletBackendId),
backendId_to_backendHttpPort.get(alterTabletBackendId), alterTabletId+"")
+ logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" +
err)
+
+ // wait for compaction done
+ def enable_new_tablet_do_compaction =
get_be_param.call("enable_new_tablet_do_compaction")
+ logger.info("enable_new_tablet_do_compaction: " +
enable_new_tablet_do_compaction)
+ boolean enable =
enable_new_tablet_do_compaction.get(alterTabletBackendId).toBoolean()
+ logger.info("enable: " + enable)
+ for (int i = 0; i < 10; i++) {
+ (code, out, err) = curl("GET", alterTabletCompactionUrl)
+ logger.info("Show tablets status: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ if (isCloudMode()) {
+ if (enable) {
+ if(tabletJson.rowsets.size() < 5) {
+ break
+ }
+ } else {
+ // "msg": "invalid tablet state. tablet_id="
+ break
+ }
+ } else {
+ if(tabletJson.rowsets.size() < 5) {
+ break
+ }
+ }
+ sleep(2000)
+ }
+
+ // unblock
+ unblock()
+ sql """ insert into ${tableName}(k1, k2, v1, v2) values(10, 20, 36, 40),
(11, 20, 36, 40); """
+ sql """ insert into ${tableName}(k1, k2, v1, v2) values(10, 20, 37, 40),
(11, 20, 37, 40); """
+ getAlterTableState("FINISHED")
+ order_qt_select2 """ select * from ${tableName}; """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]