This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f596586163 [fix](cluster key) fix cluster key duplicated key (#44776)
6f596586163 is described below

commit 6f596586163b5dedaf1b8e062c8e3ecd654cee22
Author: meiyi <me...@selectdb.com>
AuthorDate: Wed Dec 4 10:01:27 2024 +0800

    [fix](cluster key) fix cluster key duplicated key (#44776)
    
    the duplicated keys is happend when:
    1. do schema change and not finished
    2. write data and the publish phase skip calculate delete bitmap
    3. compact the newly rowsets without delete bitmap: the mow with cluster
    key need to do compaction with delete bitmap
---
 be/src/cloud/cloud_cumulative_compaction.cpp       |   2 -
 .../cloud/cloud_engine_calc_delete_bitmap_task.cpp |   2 +-
 be/src/olap/compaction.cpp                         |  57 ++++++++
 be/src/olap/compaction.h                           |   6 +
 be/src/olap/cumulative_compaction.cpp              |   2 +-
 .../java/org/apache/doris/master/MasterImpl.java   |   6 +-
 .../test_schema_change_and_compaction.out          |   8 ++
 .../test_schema_change_and_compaction.groovy       | 145 +++++++++++++++++++++
 8 files changed, 221 insertions(+), 7 deletions(-)

diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp 
b/be/src/cloud/cloud_cumulative_compaction.cpp
index c466c35e2a2..4e25cd74209 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -375,11 +375,9 @@ Status CloudCumulativeCompaction::modify_rowsets() {
 Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
     // agg previously rowset old version delete bitmap
     std::vector<RowsetSharedPtr> pre_rowsets {};
-    std::vector<std::string> pre_rowset_ids {};
     for (const auto& it : cloud_tablet()->rowset_map()) {
         if (it.first.second < _input_rowsets.front()->start_version()) {
             pre_rowsets.emplace_back(it.second);
-            pre_rowset_ids.emplace_back(it.second->rowset_id().to_string());
         }
     }
     std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 91611d20c62..336117d1012 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -227,7 +227,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
         }
     }
     auto total_update_delete_bitmap_time_us = MonotonicMicros() - t3;
-    LOG(INFO) << "calculate delete bitmap successfully on tablet"
+    LOG(INFO) << "finish calculate delete bitmap on tablet"
               << ", table_id=" << tablet->table_id() << ", transaction_id=" << 
_transaction_id
               << ", tablet_id=" << tablet->tablet_id()
               << ", get_tablet_time_us=" << get_tablet_time_us
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index e71e1862dc8..c85ce36a103 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -191,6 +191,9 @@ Status Compaction::merge_input_rowsets() {
         SCOPED_TIMER(_merge_rowsets_latency_timer);
         // 1. Merge segment files and write bkd inverted index
         if (_is_vertical) {
+            if (!_tablet->tablet_schema()->cluster_key_idxes().empty()) {
+                RETURN_IF_ERROR(update_delete_bitmap());
+            }
             res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), 
*_cur_tablet_schema,
                                                  input_rs_readers, 
_output_rs_writer.get(),
                                                  get_avg_segment_rows(), 
way_num, &_stats);
@@ -872,6 +875,60 @@ void 
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
     }
 }
 
+Status CompactionMixin::update_delete_bitmap() {
+    // for mow with cluster keys, compaction read data with delete bitmap
+    // if tablet is not ready(such as schema change), we need to update delete 
bitmap
+    {
+        std::shared_lock meta_rlock(_tablet->get_header_lock());
+        if (_tablet->tablet_state() != TABLET_NOTREADY) {
+            return Status::OK();
+        }
+    }
+    OlapStopWatch watch;
+    std::vector<RowsetSharedPtr> rowsets;
+    for (const auto& rowset : _input_rowsets) {
+        std::lock_guard rwlock(tablet()->get_rowset_update_lock());
+        std::shared_lock rlock(_tablet->get_header_lock());
+        Status st = _tablet->update_delete_bitmap_without_lock(_tablet, 
rowset, &rowsets);
+        if (!st.ok()) {
+            LOG(INFO) << "failed update_delete_bitmap_without_lock for 
tablet_id="
+                      << _tablet->tablet_id() << ", st=" << st.to_string();
+            return st;
+        }
+        rowsets.push_back(rowset);
+    }
+    LOG(INFO) << "finish update delete bitmap for tablet: " << 
_tablet->tablet_id()
+              << ", rowsets: " << _input_rowsets.size() << ", cost: " << 
watch.get_elapse_time_us()
+              << "(us)";
+    return Status::OK();
+}
+
+Status CloudCompactionMixin::update_delete_bitmap() {
+    // for mow with cluster keys, compaction read data with delete bitmap
+    // if tablet is not ready(such as schema change), we need to update delete 
bitmap
+    {
+        std::shared_lock meta_rlock(_tablet->get_header_lock());
+        if (_tablet->tablet_state() != TABLET_NOTREADY) {
+            return Status::OK();
+        }
+    }
+    OlapStopWatch watch;
+    std::vector<RowsetSharedPtr> rowsets;
+    for (const auto& rowset : _input_rowsets) {
+        Status st = _tablet->update_delete_bitmap_without_lock(_tablet, 
rowset, &rowsets);
+        if (!st.ok()) {
+            LOG(INFO) << "failed update_delete_bitmap_without_lock for 
tablet_id="
+                      << _tablet->tablet_id() << ", st=" << st.to_string();
+            return st;
+        }
+        rowsets.push_back(rowset);
+    }
+    LOG(INFO) << "finish update delete bitmap for tablet: " << 
_tablet->tablet_id()
+              << ", rowsets: " << _input_rowsets.size() << ", cost: " << 
watch.get_elapse_time_us()
+              << "(us)";
+    return Status::OK();
+}
+
 Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& 
ctx) {
     // only do index compaction for dup_keys and unique_keys with mow enabled
     if (config::inverted_index_compaction_enable &&
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 06ef4268529..7f92a6c5f4d 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -84,6 +84,8 @@ protected:
 
     int64_t merge_way_num();
 
+    virtual Status update_delete_bitmap() = 0;
+
     // the root tracker for this compaction
     std::shared_ptr<MemTrackerLimiter> _mem_tracker;
 
@@ -146,6 +148,8 @@ protected:
 
     virtual Status modify_rowsets();
 
+    Status update_delete_bitmap() override;
+
     StorageEngine& _engine;
 
 private:
@@ -175,6 +179,8 @@ public:
 protected:
     CloudTablet* cloud_tablet() { return 
static_cast<CloudTablet*>(_tablet.get()); }
 
+    Status update_delete_bitmap() override;
+
     virtual void garbage_collection();
 
     CloudStorageEngine& _engine;
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index b961c694ede..2dfd30fb86e 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -145,7 +145,7 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
         DCHECK(missing_versions.size() % 2 == 0);
         LOG(WARNING) << "There are missed versions among rowsets. "
                      << "total missed version size: " << 
missing_versions.size() / 2
-                     << " first missed version prev rowset verison=" << 
missing_versions[0]
+                     << ", first missed version prev rowset verison=" << 
missing_versions[0]
                      << ", first missed version next rowset version=" << 
missing_versions[1]
                      << ", tablet=" << _tablet->tablet_id();
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 4010a9b564d..09318af34bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -681,9 +681,9 @@ public class MasterImpl {
             CalcDeleteBitmapTask calcDeleteBitmapTask = (CalcDeleteBitmapTask) 
task;
             if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
                 
calcDeleteBitmapTask.countDownToZero(request.getTaskStatus().getStatusCode(),
-                        "backend: " + task.getBackendId() + ", 
error_tablet_size: "
-                                + request.getErrorTabletIdsSize() + ", 
err_msg: "
-                                + 
request.getTaskStatus().getErrorMsgs().toString());
+                        "backend: " + task.getBackendId() + ", 
error_tablet_size: " + request.getErrorTabletIdsSize()
+                                + ", error_tablets: " + 
request.getErrorTabletIds()
+                                + ", err_msg: " + 
request.getTaskStatus().getErrorMsgs().toString());
             } else if (request.isSetRespPartitions()
                     && 
calcDeleteBitmapTask.isFinishRequestStale(request.getRespPartitions())) {
                 LOG.warn("get staled response from backend: {}, report 
version: {}. calcDeleteBitmapTask's"
diff --git 
a/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out
 
b/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out
new file mode 100644
index 00000000000..35f26a488be
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select1 --
+10     20      35      40
+
+-- !select2 --
+10     20      40      37
+11     20      40      37
+
diff --git 
a/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy
 
b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy
new file mode 100644
index 00000000000..dfb7facf5ee
--- /dev/null
+++ 
b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
+// and modified by Doris.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_schema_change_and_compaction", "nonConcurrent") {
+    def tableName = "test_schema_change_and_compaction"
+
+    def getAlterTableState = { job_state ->
+        def retry = 0
+        def last_state = ""
+        while (true) {
+            sleep(2000)
+            def state = sql " show alter table column where tablename = 
'${tableName}' order by CreateTime desc limit 1"
+            logger.info("alter table state: ${state}")
+            last_state = state[0][9]
+            if (state.size() > 0 && state[0][9] == job_state) {
+                return
+            }
+            retry++
+            if (retry >= 10) {
+                break
+            }
+        }
+        assertTrue(false, "alter table job state is ${last_state}, not 
${job_state} after retry ${retry} times")
+    }
+
+    def block_convert_historical_rowsets = {
+        if (isCloudMode()) {
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
+        } else {
+            
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block")
+        }
+    }
+
+    def unblock = {
+        if (isCloudMode()) {
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
+        } else {
+            
GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block")
+        }
+    }
+
+    onFinish {
+        unblock()
+    }
+
+    sql """ DROP TABLE IF EXISTS ${tableName} force """
+    sql """
+        CREATE TABLE ${tableName} ( `k1` int(11), `k2` int(11), `v1` int(11), 
`v2` int(11) ) ENGINE=OLAP
+        unique KEY(`k1`, `k2`) cluster by(v1) DISTRIBUTED BY HASH(`k1`) 
BUCKETS 1
+        PROPERTIES ( "replication_num" = "1" );
+    """
+    sql """ insert into ${tableName} values(10, 20, 30, 40); """
+
+    // alter table
+    block_convert_historical_rowsets()
+    sql """ alter table ${tableName} order by(k1, k2, v2, v1); """
+    getAlterTableState("RUNNING")
+
+    def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+    logger.info("tablets: ${tablets}")
+    assertEquals(2, tablets.size())
+    String alterTabletId = ""
+    String alterTabletBackendId = ""
+    String alterTabletCompactionUrl = ""
+    for (Map<String, String> tablet : tablets) {
+        if (tablet["State"] == "ALTER") {
+            alterTabletId = tablet["TabletId"].toLong()
+            alterTabletBackendId = tablet["BackendId"]
+            alterTabletCompactionUrl = tablet["CompactionStatus"]
+        }
+    }
+    logger.info("alterTabletId: ${alterTabletId}, alterTabletBackendId: 
${alterTabletBackendId}, alterTabletCompactionUrl: ${alterTabletCompactionUrl}")
+    assertTrue(!alterTabletId.isEmpty())
+
+    // write some data
+    sql """ insert into ${tableName} values(10, 20, 31, 40); """
+    sql """ insert into ${tableName} values(10, 20, 32, 40); """
+    sql """ insert into ${tableName} values(10, 20, 33, 40); """
+    sql """ insert into ${tableName} values(10, 20, 34, 40); """
+    sql """ insert into ${tableName} values(10, 20, 35, 40); """
+    order_qt_select1 """ select * from ${tableName}; """
+
+    // trigger compaction
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
+    logger.info("ip: " + backendId_to_backendIP.get(alterTabletBackendId) + ", 
port: " + backendId_to_backendHttpPort.get(alterTabletBackendId))
+    def (code, out, err) = 
be_run_cumulative_compaction(backendId_to_backendIP.get(alterTabletBackendId), 
backendId_to_backendHttpPort.get(alterTabletBackendId), alterTabletId+"")
+    logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + 
err)
+
+    // wait for compaction done
+    def enable_new_tablet_do_compaction = 
get_be_param.call("enable_new_tablet_do_compaction")
+    logger.info("enable_new_tablet_do_compaction: " + 
enable_new_tablet_do_compaction)
+    boolean enable = 
enable_new_tablet_do_compaction.get(alterTabletBackendId).toBoolean()
+    logger.info("enable: " + enable)
+    for (int i = 0; i < 10; i++) {
+        (code, out, err) = curl("GET", alterTabletCompactionUrl)
+        logger.info("Show tablets status: code=" + code + ", out=" + out + ", 
err=" + err)
+        assertEquals(code, 0)
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        if (isCloudMode()) {
+            if (enable) {
+                if(tabletJson.rowsets.size() < 5) {
+                    break
+                }
+            } else {
+                // "msg": "invalid tablet state. tablet_id="
+                break
+            }
+        } else {
+            if(tabletJson.rowsets.size() < 5) {
+                break
+            }
+        }
+        sleep(2000)
+    }
+
+    // unblock
+    unblock()
+    sql """ insert into ${tableName}(k1, k2, v1, v2) values(10, 20, 36, 40), 
(11, 20, 36, 40); """
+    sql """ insert into ${tableName}(k1, k2, v1, v2) values(10, 20, 37, 40), 
(11, 20, 37, 40); """
+    getAlterTableState("FINISHED")
+    order_qt_select2 """ select * from ${tableName}; """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to