This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dcc7c078c75 [Fix](partial update) Fix rowset not found error when
doing partial update (#34112)
dcc7c078c75 is described below
commit dcc7c078c7536d8ad679d0f12517744aeae990d5
Author: abmdocrt <[email protected]>
AuthorDate: Sun Apr 28 14:57:17 2024 +0800
[Fix](partial update) Fix rowset not found error when doing partial update
(#34112)
Cause: In the logic of partial column updates, the existing data columns
are read first, and then the data is supplemented and written back. During the
reading process, initialization involves initially fetching rowset IDs, and the
actual rowset object is fetched only when needed later. However, between
fetching the rowset IDs and the rowset object, compaction may occur, turning
the old rowset into a stale rowset. If too much time passes, the stale rowset
might be directly deleted. Th [...]
Solution: To avoid such issues during partial column updates, the
initialization step should involve fetching both the rowset IDs and the shared
pointer to the rowset object simultaneously. This ensures that the rowset can
always be found during data retrieval.
---
be/src/olap/base_tablet.cpp | 11 +-
be/src/olap/base_tablet.h | 4 +-
be/src/olap/cumulative_compaction_policy.cpp | 2 +
be/src/olap/olap_common.h | 10 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 19 +---
.../rowset/segment_v2/vertical_segment_writer.cpp | 20 +---
be/src/olap/rowset_builder.cpp | 6 +-
..._update_rowset_not_found_fault_injection.groovy | 112 +++++++++++++++++++++
8 files changed, 133 insertions(+), 51 deletions(-)
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 6128bdfb752..3dd5c1a3399 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -445,7 +445,7 @@ Status BaseTablet::calc_delete_bitmap_between_segments(
}
std::vector<RowsetSharedPtr> BaseTablet::get_rowset_by_ids(
- const RowsetIdUnorderedSet* specified_rowset_ids, bool include_stale) {
+ const RowsetIdUnorderedSet* specified_rowset_ids) {
std::vector<RowsetSharedPtr> rowsets;
for (auto& rs : _rs_version_map) {
if (!specified_rowset_ids ||
@@ -454,15 +454,6 @@ std::vector<RowsetSharedPtr> BaseTablet::get_rowset_by_ids(
}
}
- if (include_stale && specified_rowset_ids != nullptr &&
- rowsets.size() != specified_rowset_ids->size()) {
- for (auto& rs : _stale_rs_version_map) {
- if (specified_rowset_ids->find(rs.second->rowset_id()) !=
specified_rowset_ids->end()) {
- rowsets.push_back(rs.second);
- }
- }
- }
-
std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs,
RowsetSharedPtr& rhs) {
return lhs->end_version() > rhs->end_version();
});
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 8f41ef6a57b..397e93e8058 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -128,8 +128,8 @@ public:
////////////////////////////////////////////////////////////////////////////
// begin MoW functions
////////////////////////////////////////////////////////////////////////////
- std::vector<RowsetSharedPtr> get_rowset_by_ids(const RowsetIdUnorderedSet*
specified_rowset_ids,
- bool include_stale = false);
+ std::vector<RowsetSharedPtr> get_rowset_by_ids(
+ const RowsetIdUnorderedSet* specified_rowset_ids);
// Lookup a row with TupleDescriptor and fill Block
Status lookup_row_data(const Slice& encoded_key, const RowLocation&
row_location,
diff --git a/be/src/olap/cumulative_compaction_policy.cpp
b/be/src/olap/cumulative_compaction_policy.cpp
index 0d35ae7ca4f..ee7a2b1812a 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -284,6 +284,8 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
transient_size += 1;
input_rowsets->push_back(rowset);
}
+
DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets",
+ { return transient_size; })
if (total_size >= promotion_size) {
return transient_size;
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 4d0c035bb0f..4acf18a0834 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -36,6 +36,7 @@
#include "io/io_common.h"
#include "olap/olap_define.h"
+#include "olap/rowset/rowset_fwd.h"
#include "util/hash_util.hpp"
#include "util/time.h"
#include "util/uid_util.h"
@@ -494,11 +495,16 @@ class DeleteBitmap;
// merge on write context
struct MowContext {
MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
- std::shared_ptr<DeleteBitmap> db)
- : max_version(version), txn_id(txnid), rowset_ids(ids),
delete_bitmap(db) {}
+ const std::vector<RowsetSharedPtr>& rowset_ptrs,
std::shared_ptr<DeleteBitmap> db)
+ : max_version(version),
+ txn_id(txnid),
+ rowset_ids(ids),
+ rowset_ptrs(rowset_ptrs),
+ delete_bitmap(db) {}
int64_t max_version;
int64_t txn_id;
const RowsetIdUnorderedSet& rowset_ids;
+ std::vector<RowsetSharedPtr> rowset_ptrs;
std::shared_ptr<DeleteBitmap> delete_bitmap;
};
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 7a83496b7fb..10ce2137b9c 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -437,24 +437,7 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
std::vector<RowsetSharedPtr> specified_rowsets;
{
std::shared_lock rlock(tablet->get_header_lock());
-
- // Under normal circumstances, `get_rowset_by_ids` does not need to
consider the stale
- // rowset, in other word, if a rowset id is not found in the normal
rowset, we can ignore
- // it. This is because even if we handle stale rowset here, we need to
recalculate the
- // new rowset generated by the corresponding compaction in the publish
phase.
- // However, for partial update, ignoring the stale rowset may cause
some keys to not be
- // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND),
and thus be mistaken
- // as new keys in the flush phase, which will cause the load to fail
in the following
- // two cases:
- // 1. when strict_mode is enabled, new keys are not allowed to be
added.
- // 2. Some columns that need to be filled are neither nullable nor
have a default value,
- // in which case the value of the field cannot be filled as a new
key, leading to a
- // failure of the load.
- bool should_include_stale =
- _opts.rowset_ctx->partial_update_info->is_strict_mode ||
-
!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update;
- specified_rowsets =
- tablet->get_rowset_by_ids(&_mow_context->rowset_ids,
should_include_stale);
+ specified_rowsets = _mow_context->rowset_ptrs;
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// Only when this is a strict mode partial update that missing
rowsets here will lead to problems.
// In other case, the missing rowsets will be calculated in later
phases(commit phase/publish phase)
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 8f7bb9fe245..b0b24a79c0a 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -368,24 +368,10 @@ Status
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
std::vector<RowsetSharedPtr> specified_rowsets;
{
+
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_partial_content.sleep",
+ { sleep(60); })
std::shared_lock rlock(tablet->get_header_lock());
- // Under normal circumstances, `get_rowset_by_ids` does not need to
consider the stale
- // rowset, in other word, if a rowset id is not found in the normal
rowset, we can ignore
- // it. This is because even if we handle stale rowset here, we need to
recalculate the
- // new rowset generated by the corresponding compaction in the publish
phase.
- // However, for partial update, ignoring the stale rowset may cause
some keys to not be
- // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND),
and thus be mistaken
- // as new keys in the flush phase, which will cause the load to fail
in the following
- // two cases:
- // 1. when strict_mode is enabled, new keys are not allowed to be
added.
- // 2. Some columns that need to be filled are neither nullable nor
have a default value,
- // in which case the value of the field cannot be filled as a new
key, leading to a
- // failure of the load.
- bool should_include_stale =
- _opts.rowset_ctx->partial_update_info->is_strict_mode ||
-
!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update;
- specified_rowsets =
- tablet->get_rowset_by_ids(&_mow_context->rowset_ids,
should_include_stale);
+ specified_rowsets = _mow_context->rowset_ptrs;
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// Only when this is a strict mode partial update that missing
rowsets here will lead to problems.
// In other case, the missing rowsets will be calculated in later
phases(commit phase/publish phase)
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 4458d43c17e..9d8b8163b71 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -123,6 +123,7 @@ void RowsetBuilder::_garbage_collection() {
Status BaseRowsetBuilder::init_mow_context(std::shared_ptr<MowContext>&
mow_context) {
std::lock_guard<std::shared_mutex> lck(tablet()->get_header_lock());
int64_t cur_max_version = tablet()->max_version_unlocked();
+ std::vector<RowsetSharedPtr> rowset_ptrs;
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
if (tablet()->tablet_state() == TABLET_NOTREADY) {
// Disable 'partial_update' when the tablet is undergoing a 'schema
changing process'
@@ -134,10 +135,11 @@ Status
BaseRowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_cont
_rowset_ids.clear();
} else {
RETURN_IF_ERROR(tablet()->get_all_rs_id_unlocked(cur_max_version,
&_rowset_ids));
+ rowset_ptrs = tablet()->get_rowset_by_ids(&_rowset_ids);
}
_delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id());
- mow_context =
- std::make_shared<MowContext>(cur_max_version, _req.txn_id,
_rowset_ids, _delete_bitmap);
+ mow_context = std::make_shared<MowContext>(cur_max_version, _req.txn_id,
_rowset_ids,
+ rowset_ptrs, _delete_bitmap);
return Status::OK();
}
diff --git
a/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
new file mode 100644
index 00000000000..befad64da0a
--- /dev/null
+++
b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_partial_update_rowset_not_found_fault_injection",
"p2,nonConcurrent") {
+ def testTable = "test_partial_update_rowset_not_found_fault_injection"
+ sql """ DROP TABLE IF EXISTS ${testTable}"""
+ sql """
+ create table ${testTable}
+ (
+ `k1` INT,
+ `v1` INT NOT NULL,
+ `v2` INT NOT NULL,
+ `v3` INT NOT NULL,
+ `v4` INT NOT NULL,
+ `v5` INT NOT NULL,
+ `v6` INT NOT NULL,
+ `v7` INT NOT NULL,
+ `v8` INT NOT NULL,
+ `v9` INT NOT NULL,
+ `v10` INT NOT NULL
+ )
+ UNIQUE KEY (`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ def load_data = {
+ streamLoad {
+ table 'test_partial_update_rowset_not_found_fault_injection'
+ set 'column_separator', ','
+ set 'compress_type', 'GZ'
+
+
+ file
"""${getS3Url()}/regression/fault_injection/test_partial_update_rowset_not_found_falut_injection1.csv.gz"""
+
+ time 300000
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_params = [string:[:]]
+
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ load_data()
+ def error = false
+
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep")
+
GetDebugPoint().enableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets")
+ def thread = Thread.start{
+ try {
+ sql """update ${testTable} set v10=1"""
+ }
+ catch (Exception e){
+ logger.info(e.getMessage())
+ error = true
+ }
+ }
+
+ Thread.sleep(2000)
+ // trigger compactions for all tablets in ${tableName}
+ def tablets = sql_return_maparray """ show tablets from ${testTable};
"""
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ }
+
+ thread.join()
+ assertFalse(error)
+ } catch (Exception e){
+ logger.info(e.getMessage())
+ assertFalse(true)
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep")
+
GetDebugPoint().disableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]