This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dcc7c078c75 [Fix](partial update) Fix rowset not found error when 
doing partial update (#34112)
dcc7c078c75 is described below

commit dcc7c078c7536d8ad679d0f12517744aeae990d5
Author: abmdocrt <[email protected]>
AuthorDate: Sun Apr 28 14:57:17 2024 +0800

    [Fix](partial update) Fix rowset not found error when doing partial update 
(#34112)
    
    Cause: In the logic of partial column updates, the existing data columns 
are read first, and then the data is supplemented and written back. During the 
reading process, initialization involves initially fetching rowset IDs, and the 
actual rowset object is fetched only when needed later. However, between 
fetching the rowset IDs and the rowset object, compaction may occur, turning 
the old rowset into a stale rowset. If too much time passes, the stale rowset 
might be directly deleted. Th [...]
    
    Solution: To avoid such issues during partial column updates, the 
initialization step should involve fetching both the rowset IDs and the shared 
pointer to the rowset object simultaneously. This ensures that the rowset can 
always be found during data retrieval.
---
 be/src/olap/base_tablet.cpp                        |  11 +-
 be/src/olap/base_tablet.h                          |   4 +-
 be/src/olap/cumulative_compaction_policy.cpp       |   2 +
 be/src/olap/olap_common.h                          |  10 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  19 +---
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  20 +---
 be/src/olap/rowset_builder.cpp                     |   6 +-
 ..._update_rowset_not_found_fault_injection.groovy | 112 +++++++++++++++++++++
 8 files changed, 133 insertions(+), 51 deletions(-)

diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 6128bdfb752..3dd5c1a3399 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -445,7 +445,7 @@ Status BaseTablet::calc_delete_bitmap_between_segments(
 }
 
 std::vector<RowsetSharedPtr> BaseTablet::get_rowset_by_ids(
-        const RowsetIdUnorderedSet* specified_rowset_ids, bool include_stale) {
+        const RowsetIdUnorderedSet* specified_rowset_ids) {
     std::vector<RowsetSharedPtr> rowsets;
     for (auto& rs : _rs_version_map) {
         if (!specified_rowset_ids ||
@@ -454,15 +454,6 @@ std::vector<RowsetSharedPtr> BaseTablet::get_rowset_by_ids(
         }
     }
 
-    if (include_stale && specified_rowset_ids != nullptr &&
-        rowsets.size() != specified_rowset_ids->size()) {
-        for (auto& rs : _stale_rs_version_map) {
-            if (specified_rowset_ids->find(rs.second->rowset_id()) != 
specified_rowset_ids->end()) {
-                rowsets.push_back(rs.second);
-            }
-        }
-    }
-
     std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs, 
RowsetSharedPtr& rhs) {
         return lhs->end_version() > rhs->end_version();
     });
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 8f41ef6a57b..397e93e8058 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -128,8 +128,8 @@ public:
     
////////////////////////////////////////////////////////////////////////////
     // begin MoW functions
     
////////////////////////////////////////////////////////////////////////////
-    std::vector<RowsetSharedPtr> get_rowset_by_ids(const RowsetIdUnorderedSet* 
specified_rowset_ids,
-                                                   bool include_stale = false);
+    std::vector<RowsetSharedPtr> get_rowset_by_ids(
+            const RowsetIdUnorderedSet* specified_rowset_ids);
 
     // Lookup a row with TupleDescriptor and fill Block
     Status lookup_row_data(const Slice& encoded_key, const RowLocation& 
row_location,
diff --git a/be/src/olap/cumulative_compaction_policy.cpp 
b/be/src/olap/cumulative_compaction_policy.cpp
index 0d35ae7ca4f..ee7a2b1812a 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -284,6 +284,8 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
         transient_size += 1;
         input_rowsets->push_back(rowset);
     }
+    
DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets",
+                    { return transient_size; })
 
     if (total_size >= promotion_size) {
         return transient_size;
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 4d0c035bb0f..4acf18a0834 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -36,6 +36,7 @@
 
 #include "io/io_common.h"
 #include "olap/olap_define.h"
+#include "olap/rowset/rowset_fwd.h"
 #include "util/hash_util.hpp"
 #include "util/time.h"
 #include "util/uid_util.h"
@@ -494,11 +495,16 @@ class DeleteBitmap;
 // merge on write context
 struct MowContext {
     MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
-               std::shared_ptr<DeleteBitmap> db)
-            : max_version(version), txn_id(txnid), rowset_ids(ids), 
delete_bitmap(db) {}
+               const std::vector<RowsetSharedPtr>& rowset_ptrs, 
std::shared_ptr<DeleteBitmap> db)
+            : max_version(version),
+              txn_id(txnid),
+              rowset_ids(ids),
+              rowset_ptrs(rowset_ptrs),
+              delete_bitmap(db) {}
     int64_t max_version;
     int64_t txn_id;
     const RowsetIdUnorderedSet& rowset_ids;
+    std::vector<RowsetSharedPtr> rowset_ptrs;
     std::shared_ptr<DeleteBitmap> delete_bitmap;
 };
 
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 7a83496b7fb..10ce2137b9c 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -437,24 +437,7 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     std::vector<RowsetSharedPtr> specified_rowsets;
     {
         std::shared_lock rlock(tablet->get_header_lock());
-
-        // Under normal circumstances, `get_rowset_by_ids` does not need to 
consider the stale
-        // rowset, in other word, if a rowset id is not found in the normal 
rowset, we can ignore
-        // it. This is because even if we handle stale rowset here, we need to 
recalculate the
-        // new rowset generated by the corresponding compaction in the publish 
phase.
-        // However, for partial update, ignoring the stale rowset may cause 
some keys to not be
-        // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND), 
and thus be mistaken
-        // as new keys in the flush phase, which will cause the load to fail 
in the following
-        // two cases:
-        // 1. when strict_mode is enabled, new keys are not allowed to be 
added.
-        // 2. Some columns that need to be filled are neither nullable nor 
have a default value,
-        //    in which case the value of the field cannot be filled as a new 
key, leading to a
-        //    failure of the load.
-        bool should_include_stale =
-                _opts.rowset_ctx->partial_update_info->is_strict_mode ||
-                
!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update;
-        specified_rowsets =
-                tablet->get_rowset_by_ids(&_mow_context->rowset_ids, 
should_include_stale);
+        specified_rowsets = _mow_context->rowset_ptrs;
         if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
             // Only when this is a strict mode partial update that missing 
rowsets here will lead to problems.
             // In other case, the missing rowsets will be calculated in later 
phases(commit phase/publish phase)
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 8f7bb9fe245..b0b24a79c0a 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -368,24 +368,10 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
 
     std::vector<RowsetSharedPtr> specified_rowsets;
     {
+        
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_partial_content.sleep",
+                        { sleep(60); })
         std::shared_lock rlock(tablet->get_header_lock());
-        // Under normal circumstances, `get_rowset_by_ids` does not need to 
consider the stale
-        // rowset, in other word, if a rowset id is not found in the normal 
rowset, we can ignore
-        // it. This is because even if we handle stale rowset here, we need to 
recalculate the
-        // new rowset generated by the corresponding compaction in the publish 
phase.
-        // However, for partial update, ignoring the stale rowset may cause 
some keys to not be
-        // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND), 
and thus be mistaken
-        // as new keys in the flush phase, which will cause the load to fail 
in the following
-        // two cases:
-        // 1. when strict_mode is enabled, new keys are not allowed to be 
added.
-        // 2. Some columns that need to be filled are neither nullable nor 
have a default value,
-        //    in which case the value of the field cannot be filled as a new 
key, leading to a
-        //    failure of the load.
-        bool should_include_stale =
-                _opts.rowset_ctx->partial_update_info->is_strict_mode ||
-                
!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update;
-        specified_rowsets =
-                tablet->get_rowset_by_ids(&_mow_context->rowset_ids, 
should_include_stale);
+        specified_rowsets = _mow_context->rowset_ptrs;
         if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
             // Only when this is a strict mode partial update that missing 
rowsets here will lead to problems.
             // In other case, the missing rowsets will be calculated in later 
phases(commit phase/publish phase)
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 4458d43c17e..9d8b8163b71 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -123,6 +123,7 @@ void RowsetBuilder::_garbage_collection() {
 Status BaseRowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& 
mow_context) {
     std::lock_guard<std::shared_mutex> lck(tablet()->get_header_lock());
     int64_t cur_max_version = tablet()->max_version_unlocked();
+    std::vector<RowsetSharedPtr> rowset_ptrs;
     // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
     if (tablet()->tablet_state() == TABLET_NOTREADY) {
         // Disable 'partial_update' when the tablet is undergoing a 'schema 
changing process'
@@ -134,10 +135,11 @@ Status 
BaseRowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_cont
         _rowset_ids.clear();
     } else {
         RETURN_IF_ERROR(tablet()->get_all_rs_id_unlocked(cur_max_version, 
&_rowset_ids));
+        rowset_ptrs = tablet()->get_rowset_by_ids(&_rowset_ids);
     }
     _delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id());
-    mow_context =
-            std::make_shared<MowContext>(cur_max_version, _req.txn_id, 
_rowset_ids, _delete_bitmap);
+    mow_context = std::make_shared<MowContext>(cur_max_version, _req.txn_id, 
_rowset_ids,
+                                               rowset_ptrs, _delete_bitmap);
     return Status::OK();
 }
 
diff --git 
a/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
new file mode 100644
index 00000000000..befad64da0a
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_partial_update_rowset_not_found_fault_injection", 
"p2,nonConcurrent") {
+    def testTable = "test_partial_update_rowset_not_found_fault_injection"
+    sql """ DROP TABLE IF EXISTS ${testTable}"""
+    sql """
+        create table ${testTable}
+        (
+        `k1` INT,
+        `v1` INT NOT NULL,
+        `v2` INT NOT NULL,
+        `v3` INT NOT NULL,
+        `v4` INT NOT NULL,
+        `v5` INT NOT NULL,
+        `v6` INT NOT NULL,
+        `v7` INT NOT NULL,
+        `v8` INT NOT NULL,
+        `v9` INT NOT NULL,
+        `v10` INT NOT NULL
+        )
+        UNIQUE KEY (`k1`)
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        PROPERTIES (
+        "replication_num" = "1",
+        "disable_auto_compaction" = "true"
+        );
+    """
+
+    def load_data =  {
+        streamLoad {
+            table 'test_partial_update_rowset_not_found_fault_injection'
+            set 'column_separator', ','
+            set 'compress_type', 'GZ'
+
+
+            file 
"""${getS3Url()}/regression/fault_injection/test_partial_update_rowset_not_found_falut_injection1.csv.gz"""
+
+            time 300000 
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+    }
+
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_params = [string:[:]]
+
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    load_data()
+    def error = false
+
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep")
+        
GetDebugPoint().enableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets")
+        def thread = Thread.start{
+            try {
+                sql """update ${testTable} set v10=1"""
+            }
+            catch (Exception e){
+                logger.info(e.getMessage())
+                error = true
+            }
+        }
+
+        Thread.sleep(2000)
+        // trigger compactions for all tablets in ${tableName}
+        def tablets = sql_return_maparray """ show tablets from ${testTable}; 
"""
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            backend_id = tablet.BackendId
+            (code, out, err) = 
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+        }
+
+        thread.join()
+        assertFalse(error)
+    } catch (Exception e){
+        logger.info(e.getMessage())
+        assertFalse(true)
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep")
+        
GetDebugPoint().disableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to