This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6f596586163 [fix](cluster key) fix cluster key duplicated key (#44776) 6f596586163 is described below commit 6f596586163b5dedaf1b8e062c8e3ecd654cee22 Author: meiyi <me...@selectdb.com> AuthorDate: Wed Dec 4 10:01:27 2024 +0800 [fix](cluster key) fix cluster key duplicated key (#44776) the duplicated keys is happend when: 1. do schema change and not finished 2. write data and the publish phase skip calculate delete bitmap 3. compact the newly rowsets without delete bitmap: the mow with cluster key need to do compaction with delete bitmap --- be/src/cloud/cloud_cumulative_compaction.cpp | 2 - .../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 2 +- be/src/olap/compaction.cpp | 57 ++++++++ be/src/olap/compaction.h | 6 + be/src/olap/cumulative_compaction.cpp | 2 +- .../java/org/apache/doris/master/MasterImpl.java | 6 +- .../test_schema_change_and_compaction.out | 8 ++ .../test_schema_change_and_compaction.groovy | 145 +++++++++++++++++++++ 8 files changed, 221 insertions(+), 7 deletions(-) diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index c466c35e2a2..4e25cd74209 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -375,11 +375,9 @@ Status CloudCumulativeCompaction::modify_rowsets() { Status CloudCumulativeCompaction::process_old_version_delete_bitmap() { // agg previously rowset old version delete bitmap std::vector<RowsetSharedPtr> pre_rowsets {}; - std::vector<std::string> pre_rowset_ids {}; for (const auto& it : cloud_tablet()->rowset_map()) { if (it.first.second < _input_rowsets.front()->start_version()) { pre_rowsets.emplace_back(it.second); - pre_rowset_ids.emplace_back(it.second->rowset_id().to_string()); } } std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator); diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 91611d20c62..336117d1012 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -227,7 +227,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { } } auto total_update_delete_bitmap_time_us = MonotonicMicros() - t3; - LOG(INFO) << "calculate delete bitmap successfully on tablet" + LOG(INFO) << "finish calculate delete bitmap on tablet" << ", table_id=" << tablet->table_id() << ", transaction_id=" << _transaction_id << ", tablet_id=" << tablet->tablet_id() << ", get_tablet_time_us=" << get_tablet_time_us diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index e71e1862dc8..c85ce36a103 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -191,6 +191,9 @@ Status Compaction::merge_input_rowsets() { SCOPED_TIMER(_merge_rowsets_latency_timer); // 1. Merge segment files and write bkd inverted index if (_is_vertical) { + if (!_tablet->tablet_schema()->cluster_key_idxes().empty()) { + RETURN_IF_ERROR(update_delete_bitmap()); + } res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), get_avg_segment_rows(), way_num, &_stats); @@ -872,6 +875,60 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { } } +Status CompactionMixin::update_delete_bitmap() { + // for mow with cluster keys, compaction read data with delete bitmap + // if tablet is not ready(such as schema change), we need to update delete bitmap + { + std::shared_lock meta_rlock(_tablet->get_header_lock()); + if (_tablet->tablet_state() != TABLET_NOTREADY) { + return Status::OK(); + } + } + OlapStopWatch watch; + std::vector<RowsetSharedPtr> rowsets; + for (const auto& rowset : _input_rowsets) { + std::lock_guard rwlock(tablet()->get_rowset_update_lock()); + std::shared_lock rlock(_tablet->get_header_lock()); + Status st = _tablet->update_delete_bitmap_without_lock(_tablet, rowset, &rowsets); + if (!st.ok()) { + LOG(INFO) << "failed update_delete_bitmap_without_lock for tablet_id=" + << _tablet->tablet_id() << ", st=" << st.to_string(); + return st; + } + rowsets.push_back(rowset); + } + LOG(INFO) << "finish update delete bitmap for tablet: " << _tablet->tablet_id() + << ", rowsets: " << _input_rowsets.size() << ", cost: " << watch.get_elapse_time_us() + << "(us)"; + return Status::OK(); +} + +Status CloudCompactionMixin::update_delete_bitmap() { + // for mow with cluster keys, compaction read data with delete bitmap + // if tablet is not ready(such as schema change), we need to update delete bitmap + { + std::shared_lock meta_rlock(_tablet->get_header_lock()); + if (_tablet->tablet_state() != TABLET_NOTREADY) { + return Status::OK(); + } + } + OlapStopWatch watch; + std::vector<RowsetSharedPtr> rowsets; + for (const auto& rowset : _input_rowsets) { + Status st = _tablet->update_delete_bitmap_without_lock(_tablet, rowset, &rowsets); + if (!st.ok()) { + LOG(INFO) << "failed update_delete_bitmap_without_lock for tablet_id=" + << _tablet->tablet_id() << ", st=" << st.to_string(); + return st; + } + rowsets.push_back(rowset); + } + LOG(INFO) << "finish update delete bitmap for tablet: " << _tablet->tablet_id() + << ", rowsets: " << _input_rowsets.size() << ", cost: " << watch.get_elapse_time_us() + << "(us)"; + return Status::OK(); +} + Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) { // only do index compaction for dup_keys and unique_keys with mow enabled if (config::inverted_index_compaction_enable && diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 06ef4268529..7f92a6c5f4d 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -84,6 +84,8 @@ protected: int64_t merge_way_num(); + virtual Status update_delete_bitmap() = 0; + // the root tracker for this compaction std::shared_ptr<MemTrackerLimiter> _mem_tracker; @@ -146,6 +148,8 @@ protected: virtual Status modify_rowsets(); + Status update_delete_bitmap() override; + StorageEngine& _engine; private: @@ -175,6 +179,8 @@ public: protected: CloudTablet* cloud_tablet() { return static_cast<CloudTablet*>(_tablet.get()); } + Status update_delete_bitmap() override; + virtual void garbage_collection(); CloudStorageEngine& _engine; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index b961c694ede..2dfd30fb86e 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -145,7 +145,7 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { DCHECK(missing_versions.size() % 2 == 0); LOG(WARNING) << "There are missed versions among rowsets. " << "total missed version size: " << missing_versions.size() / 2 - << " first missed version prev rowset verison=" << missing_versions[0] + << ", first missed version prev rowset verison=" << missing_versions[0] << ", first missed version next rowset version=" << missing_versions[1] << ", tablet=" << _tablet->tablet_id(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 4010a9b564d..09318af34bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -681,9 +681,9 @@ public class MasterImpl { CalcDeleteBitmapTask calcDeleteBitmapTask = (CalcDeleteBitmapTask) task; if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) { calcDeleteBitmapTask.countDownToZero(request.getTaskStatus().getStatusCode(), - "backend: " + task.getBackendId() + ", error_tablet_size: " - + request.getErrorTabletIdsSize() + ", err_msg: " - + request.getTaskStatus().getErrorMsgs().toString()); + "backend: " + task.getBackendId() + ", error_tablet_size: " + request.getErrorTabletIdsSize() + + ", error_tablets: " + request.getErrorTabletIds() + + ", err_msg: " + request.getTaskStatus().getErrorMsgs().toString()); } else if (request.isSetRespPartitions() && calcDeleteBitmapTask.isFinishRequestStale(request.getRespPartitions())) { LOG.warn("get staled response from backend: {}, report version: {}. calcDeleteBitmapTask's" diff --git a/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out b/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out new file mode 100644 index 00000000000..35f26a488be --- /dev/null +++ b/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out @@ -0,0 +1,8 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +10 20 35 40 + +-- !select2 -- +10 20 40 37 +11 20 40 37 + diff --git a/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy new file mode 100644 index 00000000000..dfb7facf5ee --- /dev/null +++ b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases +// and modified by Doris. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_schema_change_and_compaction", "nonConcurrent") { + def tableName = "test_schema_change_and_compaction" + + def getAlterTableState = { job_state -> + def retry = 0 + def last_state = "" + while (true) { + sleep(2000) + def state = sql " show alter table column where tablename = '${tableName}' order by CreateTime desc limit 1" + logger.info("alter table state: ${state}") + last_state = state[0][9] + if (state.size() > 0 && state[0][9] == job_state) { + return + } + retry++ + if (retry >= 10) { + break + } + } + assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times") + } + + def block_convert_historical_rowsets = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block") + } + } + + def unblock = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block") + } + } + + onFinish { + unblock() + } + + sql """ DROP TABLE IF EXISTS ${tableName} force """ + sql """ + CREATE TABLE ${tableName} ( `k1` int(11), `k2` int(11), `v1` int(11), `v2` int(11) ) ENGINE=OLAP + unique KEY(`k1`, `k2`) cluster by(v1) DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( "replication_num" = "1" ); + """ + sql """ insert into ${tableName} values(10, 20, 30, 40); """ + + // alter table + block_convert_historical_rowsets() + sql """ alter table ${tableName} order by(k1, k2, v2, v1); """ + getAlterTableState("RUNNING") + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + logger.info("tablets: ${tablets}") + assertEquals(2, tablets.size()) + String alterTabletId = "" + String alterTabletBackendId = "" + String alterTabletCompactionUrl = "" + for (Map<String, String> tablet : tablets) { + if (tablet["State"] == "ALTER") { + alterTabletId = tablet["TabletId"].toLong() + alterTabletBackendId = tablet["BackendId"] + alterTabletCompactionUrl = tablet["CompactionStatus"] + } + } + logger.info("alterTabletId: ${alterTabletId}, alterTabletBackendId: ${alterTabletBackendId}, alterTabletCompactionUrl: ${alterTabletCompactionUrl}") + assertTrue(!alterTabletId.isEmpty()) + + // write some data + sql """ insert into ${tableName} values(10, 20, 31, 40); """ + sql """ insert into ${tableName} values(10, 20, 32, 40); """ + sql """ insert into ${tableName} values(10, 20, 33, 40); """ + sql """ insert into ${tableName} values(10, 20, 34, 40); """ + sql """ insert into ${tableName} values(10, 20, 35, 40); """ + order_qt_select1 """ select * from ${tableName}; """ + + // trigger compaction + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + logger.info("ip: " + backendId_to_backendIP.get(alterTabletBackendId) + ", port: " + backendId_to_backendHttpPort.get(alterTabletBackendId)) + def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(alterTabletBackendId), backendId_to_backendHttpPort.get(alterTabletBackendId), alterTabletId+"") + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + // wait for compaction done + def enable_new_tablet_do_compaction = get_be_param.call("enable_new_tablet_do_compaction") + logger.info("enable_new_tablet_do_compaction: " + enable_new_tablet_do_compaction) + boolean enable = enable_new_tablet_do_compaction.get(alterTabletBackendId).toBoolean() + logger.info("enable: " + enable) + for (int i = 0; i < 10; i++) { + (code, out, err) = curl("GET", alterTabletCompactionUrl) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + if (isCloudMode()) { + if (enable) { + if(tabletJson.rowsets.size() < 5) { + break + } + } else { + // "msg": "invalid tablet state. tablet_id=" + break + } + } else { + if(tabletJson.rowsets.size() < 5) { + break + } + } + sleep(2000) + } + + // unblock + unblock() + sql """ insert into ${tableName}(k1, k2, v1, v2) values(10, 20, 36, 40), (11, 20, 36, 40); """ + sql """ insert into ${tableName}(k1, k2, v1, v2) values(10, 20, 37, 40), (11, 20, 37, 40); """ + getAlterTableState("FINISHED") + order_qt_select2 """ select * from ${tableName}; """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org