This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2f192019d3 [bugfix](delete hanlder) delete predicate is merged and
could not find schema cause core dump (#12161)
2f192019d3 is described below
commit 2f192019d30aa0f48c192ebf672391b1207374ff
Author: yiguolei <[email protected]>
AuthorDate: Tue Aug 30 09:18:21 2022 +0800
[bugfix](delete hanlder) delete predicate is merged and could not find
schema cause core dump (#12161)
Co-authored-by: yiguolei <[email protected]>
---
be/src/exec/olap_scanner.cpp | 8 ++--
be/src/olap/base_compaction.cpp | 3 +-
be/src/olap/cumulative_compaction.cpp | 4 +-
be/src/olap/delete_handler.cpp | 15 +++----
be/src/olap/delete_handler.h | 5 ++-
be/src/olap/merger.cpp | 16 +++----
be/src/olap/reader.cpp | 2 +-
be/src/olap/reader.h | 2 +-
be/src/olap/schema_change.cpp | 33 +++++++-------
be/src/olap/schema_change.h | 7 +--
be/src/olap/tablet.cpp | 16 +++----
be/src/olap/tablet.h | 10 +++--
be/src/olap/tablet_meta.cpp | 50 ++++------------------
be/src/olap/tablet_meta.h | 7 +--
be/src/vec/exec/scan/new_olap_scanner.cpp | 4 +-
be/src/vec/exec/volap_scanner.cpp | 9 ++--
be/test/olap/cumulative_compaction_policy_test.cpp | 36 ++++++++++------
be/test/olap/delete_handler_test.cpp | 8 ++--
gensrc/proto/olap_file.proto | 2 +-
19 files changed, 103 insertions(+), 134 deletions(-)
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index e052125a49..8ff6db2ab6 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -203,13 +203,13 @@ Status OlapScanner::_init_tablet_reader_params(
std::copy(function_filters.cbegin(), function_filters.cend(),
std::inserter(_tablet_reader_params.function_filters,
_tablet_reader_params.function_filters.begin()));
- std::copy(_tablet->delete_predicates().cbegin(),
_tablet->delete_predicates().cend(),
+ auto& delete_preds = _tablet->delete_predicates();
+ std::copy(delete_preds.cbegin(), delete_preds.cend(),
std::inserter(_tablet_reader_params.delete_predicates,
_tablet_reader_params.delete_predicates.begin()));
// Merge the columns in delete predicate that not in latest schema in to
current tablet schema
- for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
- _tablet_schema->merge_dropped_columns(
- _tablet->tablet_schema(Version(del_pred_pb.version(),
del_pred_pb.version())));
+ for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) {
+
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version()));
}
// Range
for (auto key_range : key_ranges) {
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 22485c2b7a..2dca9b9d78 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -105,7 +105,8 @@ void BaseCompaction::_filter_input_rowset() {
Status BaseCompaction::pick_rowsets_to_compact() {
_input_rowsets.clear();
- _tablet->pick_candidate_rowsets_to_base_compaction(&_input_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+ _tablet->pick_candidate_rowsets_to_base_compaction(&_input_rowsets,
rdlock);
std::sort(_input_rowsets.begin(), _input_rowsets.end(),
Rowset::comparator);
RETURN_NOT_OK(check_version_continuity(_input_rowsets));
RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets));
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index ba52fbfdcb..4461a240b5 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -96,8 +96,8 @@ Status CumulativeCompaction::execute_compact_impl() {
Status CumulativeCompaction::pick_rowsets_to_compact() {
std::vector<RowsetSharedPtr> candidate_rowsets;
-
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
if (candidate_rowsets.empty()) {
return
Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION);
diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp
index 3c5f9ef5c6..bbb0735356 100644
--- a/be/src/olap/delete_handler.cpp
+++ b/be/src/olap/delete_handler.cpp
@@ -237,23 +237,22 @@ bool DeleteHandler::_parse_condition(const std::string&
condition_str, TConditio
return true;
}
-Status DeleteHandler::init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr
tablet_schema,
- const std::vector<DeletePredicatePB>&
delete_conditions,
- int64_t version) {
+Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
+ const std::vector<RowsetMetaSharedPtr>&
delete_preds, int64_t version) {
DCHECK(!_is_inited) << "reinitialize delete handler.";
DCHECK(version >= 0) << "invalid parameters. version=" << version;
_predicate_mem_pool.reset(new MemPool());
- for (const auto& delete_condition : delete_conditions) {
+ for (const auto& delete_pred : delete_preds) {
// Skip the delete condition with large version
- if (delete_condition.version() > version) {
+ if (delete_pred->version().first > version) {
continue;
}
// Need the tablet schema at the delete condition to parse the
accurate column unique id
- TabletSchemaSPtr delete_pred_related_schema = tablet->tablet_schema(
- Version(delete_condition.version(),
delete_condition.version()));
+ TabletSchemaSPtr delete_pred_related_schema =
delete_pred->tablet_schema();
+ auto& delete_condition = delete_pred->delete_predicate();
DeleteConditions temp;
- temp.filter_version = delete_condition.version();
+ temp.filter_version = delete_pred->version().first;
for (const auto& sub_predicate : delete_condition.sub_predicates()) {
TCondition condition;
if (!_parse_condition(sub_predicate, &condition)) {
diff --git a/be/src/olap/delete_handler.h b/be/src/olap/delete_handler.h
index 7d720a9701..3dc8e5b3b7 100644
--- a/be/src/olap/delete_handler.h
+++ b/be/src/olap/delete_handler.h
@@ -25,6 +25,7 @@
#include "olap/block_column_predicate.h"
#include "olap/column_predicate.h"
#include "olap/olap_define.h"
+#include "olap/rowset/rowset_meta.h"
#include "olap/tablet_schema.h"
namespace doris {
@@ -89,8 +90,8 @@ public:
// return:
// * Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_PARAMETERS):
input parameters are not valid
// * Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR): alloc memory
failed
- Status init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tablet_schema,
- const std::vector<DeletePredicatePB>& delete_conditions,
int64_t version);
+ Status init(TabletSchemaSPtr tablet_schema,
+ const std::vector<RowsetMetaSharedPtr>& delete_conditions,
int64_t version);
bool empty() const { return _del_conds.empty(); }
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 4ee7d18796..ace3d6b39a 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -43,16 +43,16 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet,
ReaderType reader_type,
reader_params.version = dst_rowset_writer->version();
{
std::shared_lock rdlock(tablet->get_header_lock());
- std::copy(tablet->delete_predicates().cbegin(),
tablet->delete_predicates().cend(),
+ auto delete_preds = tablet->delete_predicates();
+ std::copy(delete_preds.cbegin(), delete_preds.cend(),
std::inserter(reader_params.delete_predicates,
reader_params.delete_predicates.begin()));
}
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(*cur_tablet_schema);
// Merge the columns in delete predicate that not in latest schema in to
current tablet schema
- for (auto& del_pred_pb : reader_params.delete_predicates) {
- merge_tablet_schema->merge_dropped_columns(
- tablet->tablet_schema(Version(del_pred_pb.version(),
del_pred_pb.version())));
+ for (auto& del_pred_rs : reader_params.delete_predicates) {
+
merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_rs->version()));
}
reader_params.tablet_schema = merge_tablet_schema;
RETURN_NOT_OK(reader.init(reader_params));
@@ -116,16 +116,16 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet,
ReaderType reader_type,
reader_params.version = dst_rowset_writer->version();
{
std::shared_lock rdlock(tablet->get_header_lock());
- std::copy(tablet->delete_predicates().cbegin(),
tablet->delete_predicates().cend(),
+ auto delete_preds = tablet->delete_predicates();
+ std::copy(delete_preds.cbegin(), delete_preds.cend(),
std::inserter(reader_params.delete_predicates,
reader_params.delete_predicates.begin()));
}
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(*cur_tablet_schema);
// Merge the columns in delete predicate that not in latest schema in to
current tablet schema
- for (auto& del_pred_pb : reader_params.delete_predicates) {
- merge_tablet_schema->merge_dropped_columns(
- tablet->tablet_schema(Version(del_pred_pb.version(),
del_pred_pb.version())));
+ for (auto& del_pred_rs : reader_params.delete_predicates) {
+
merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_rs->version()));
}
reader_params.tablet_schema = merge_tablet_schema;
if (tablet->enable_unique_key_merge_on_write()) {
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 2f7eb06243..a074ec6ffd 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -497,7 +497,7 @@ Status TabletReader::_init_delete_condition(const
ReaderParams& read_params) {
_filter_delete = true;
}
- return _delete_handler.init(_tablet, _tablet_schema,
read_params.delete_predicates,
+ return _delete_handler.init(_tablet_schema, read_params.delete_predicates,
read_params.version.second);
}
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 15d1f26598..004e75c773 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -78,7 +78,7 @@ public:
std::vector<TCondition> conditions;
std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>
bloom_filters;
std::vector<FunctionFilter> function_filters;
- std::vector<DeletePredicatePB> delete_predicates;
+ std::vector<RowsetMetaSharedPtr> delete_predicates;
// For unique key table with merge-on-write
DeleteBitmap* delete_bitmap {nullptr};
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 04683d75d1..65f9b61bb1 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1850,15 +1850,15 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
res =
Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS);
break;
}
- for (auto& delete_pred : base_tablet->delete_predicates()) {
- if (delete_pred.version() > end_version) {
+ auto& all_del_preds = base_tablet->delete_predicates();
+ for (auto& delete_pred : all_del_preds) {
+ if (delete_pred->version().first > end_version) {
continue;
}
-
base_tablet_schema->merge_dropped_columns(base_tablet->tablet_schema(
- Version(delete_pred.version(),
delete_pred.version())));
+ base_tablet_schema->merge_dropped_columns(
+ base_tablet->tablet_schema(delete_pred->version()));
}
- res = delete_handler.init(base_tablet, base_tablet_schema,
- base_tablet->delete_predicates(),
end_version);
+ res = delete_handler.init(base_tablet_schema, all_del_preds,
end_version);
if (!res) {
LOG(WARNING) << "init delete handler failed. base_tablet="
<< base_tablet->full_name() << ", end_version="
<< end_version;
@@ -2013,9 +2013,7 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
bool sc_directly = false;
// a.Parse the Alter request and convert it into an internal representation
- Status res = _parse_request(sc_params.base_tablet, sc_params.new_tablet,
&rb_changer,
- &sc_sorting, &sc_directly,
sc_params.materialized_params_map,
- *sc_params.desc_tbl,
sc_params.base_tablet_schema);
+ Status res = _parse_request(sc_params, &rb_changer, &sc_sorting,
&sc_directly);
auto process_alter_exit = [&]() -> Status {
{
@@ -2115,12 +2113,15 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
// @static
// Analyze the mapping of the column and the mapping of the filter key
-Status SchemaChangeHandler::_parse_request(
- TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
RowBlockChanger* rb_changer,
- bool* sc_sorting, bool* sc_directly,
- const std::unordered_map<std::string, AlterMaterializedViewParam>&
- materialized_function_map,
- DescriptorTbl desc_tbl, TabletSchemaSPtr base_tablet_schema) {
+Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
+ RowBlockChanger* rb_changer, bool*
sc_sorting,
+ bool* sc_directly) {
+ TabletSharedPtr base_tablet = sc_params.base_tablet;
+ TabletSharedPtr new_tablet = sc_params.new_tablet;
+ TabletSchemaSPtr base_tablet_schema = sc_params.base_tablet_schema;
+ const std::unordered_map<std::string, AlterMaterializedViewParam>&
materialized_function_map =
+ sc_params.materialized_params_map;
+ DescriptorTbl desc_tbl = *sc_params.desc_tbl;
// set column mapping
for (int i = 0, new_schema_size =
new_tablet->tablet_schema()->num_columns();
i < new_schema_size; ++i) {
@@ -2232,7 +2233,7 @@ Status SchemaChangeHandler::_parse_request(
}
}
- if (base_tablet->delete_predicates().size() != 0) {
+ if (!sc_params.delete_handler->empty()) {
// there exists delete condition in header, can't do linked schema
change
*sc_directly = true;
}
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 45747a1400..1a8c26ced6 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -305,11 +305,8 @@ private:
static Status _convert_historical_rowsets(const SchemaChangeParams&
sc_params);
- static Status _parse_request(TabletSharedPtr base_tablet, TabletSharedPtr
new_tablet,
- RowBlockChanger* rb_changer, bool*
sc_sorting, bool* sc_directly,
- const std::unordered_map<std::string,
AlterMaterializedViewParam>&
- materialized_function_map,
- DescriptorTbl desc_tbl, TabletSchemaSPtr
base_tablet_schema);
+ static Status _parse_request(const SchemaChangeParams& sc_params,
RowBlockChanger* rb_changer,
+ bool* sc_sorting, bool* sc_directly);
// Initialization Settings for creating a default value
static Status _init_column_mapping(ColumnMapping* column_mapping,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 49ae2fd2ea..6aaeff9632 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -177,9 +177,6 @@ Status Tablet::revise_tablet_meta(const
std::vector<RowsetMetaSharedPtr>& rowset
// delete versions from new local tablet_meta
for (const Version& version : versions_to_delete) {
new_tablet_meta->delete_rs_meta_by_version(version, nullptr);
- if (new_tablet_meta->version_for_delete_predicate(version)) {
- new_tablet_meta->remove_delete_predicate_by_version(version);
- }
LOG(INFO) << "delete version from new local tablet_meta when
clone. [table="
<< full_name() << ", version=" << version << "]";
}
@@ -1137,17 +1134,18 @@ TabletInfo Tablet::get_tablet_info() const {
}
void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
- std::vector<RowsetSharedPtr>* candidate_rowsets) {
+ std::vector<RowsetSharedPtr>* candidate_rowsets,
+ std::shared_lock<std::shared_mutex>& /* meta lock*/) {
if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
return;
}
- std::shared_lock rdlock(_meta_lock);
_cumulative_compaction_policy->pick_candidate_rowsets(_rs_version_map,
_cumulative_point,
candidate_rowsets);
}
-void
Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>*
candidate_rowsets) {
- std::shared_lock rdlock(_meta_lock);
+void Tablet::pick_candidate_rowsets_to_base_compaction(
+ vector<RowsetSharedPtr>* candidate_rowsets,
+ std::shared_lock<std::shared_mutex>& /* meta lock*/) {
for (auto& it : _rs_version_map) {
// Do compaction on local rowsets only.
if (it.first.first < _cumulative_point && it.second->is_local()) {
@@ -1753,10 +1751,6 @@ Status Tablet::cooldown() {
has_shutdown = tablet_state() == TABLET_SHUTDOWN;
if (!has_shutdown) {
modify_rowsets(to_add, to_delete);
- if (new_rowset_meta->has_delete_predicate()) {
-
_tablet_meta->add_delete_predicate(new_rowset_meta->delete_predicate(),
-
new_rowset_meta->start_version());
- }
_self_owned_remote_rowsets.insert(to_add.front());
save_meta();
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 65cee05302..6326f3c13b 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -20,6 +20,7 @@
#include <functional>
#include <memory>
#include <set>
+#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -156,7 +157,7 @@ public:
Status capture_rs_readers(const std::vector<Version>& version_path,
std::vector<RowsetReaderSharedPtr>* rs_readers)
const;
- const std::vector<DeletePredicatePB>& delete_predicates() {
+ const std::vector<RowsetMetaSharedPtr> delete_predicates() {
return _tablet_meta->delete_predicates();
}
bool version_for_delete_predicate(const Version& version);
@@ -228,8 +229,11 @@ public:
TabletInfo get_tablet_info() const;
void pick_candidate_rowsets_to_cumulative_compaction(
- std::vector<RowsetSharedPtr>* candidate_rowsets);
- void
pick_candidate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>*
candidate_rowsets);
+ std::vector<RowsetSharedPtr>* candidate_rowsets,
+ std::shared_lock<std::shared_mutex>& /* meta lock*/);
+ void pick_candidate_rowsets_to_base_compaction(
+ std::vector<RowsetSharedPtr>* candidate_rowsets,
+ std::shared_lock<std::shared_mutex>& /* meta lock*/);
void calculate_cumulative_point();
// TODO(ygl):
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 6e2d23ddc6..47d409abcd 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -208,7 +208,6 @@ TabletMeta::TabletMeta(const TabletMeta& b)
_schema(b._schema),
_rs_metas(b._rs_metas),
_stale_rs_metas(b._stale_rs_metas),
- _del_predicates(b._del_predicates),
_in_restore_mode(b._in_restore_mode),
_preferred_rowset_type(b._preferred_rowset_type),
_storage_policy(b._storage_policy),
@@ -447,9 +446,6 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
for (auto& it : tablet_meta_pb.rs_metas()) {
RowsetMetaSharedPtr rs_meta(new RowsetMeta());
rs_meta->init_from_pb(it);
- if (rs_meta->has_delete_predicate()) {
- add_delete_predicate(rs_meta->delete_predicate(),
rs_meta->version().first);
- }
_rs_metas.push_back(std::move(rs_meta));
}
@@ -607,12 +603,7 @@ Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr&
rs_meta) {
}
}
}
-
_rs_metas.push_back(rs_meta);
- if (rs_meta->has_delete_predicate()) {
- add_delete_predicate(rs_meta->delete_predicate(),
rs_meta->version().first);
- }
-
return Status::OK();
}
@@ -640,9 +631,6 @@ void TabletMeta::modify_rs_metas(const
std::vector<RowsetMetaSharedPtr>& to_add,
auto it = _rs_metas.begin();
while (it != _rs_metas.end()) {
if (rs_to_del->version() == (*it)->version()) {
- if ((*it)->has_delete_predicate()) {
- remove_delete_predicate_by_version((*it)->version());
- }
_rs_metas.erase(it);
// there should be only one rowset match the version
break;
@@ -721,36 +709,14 @@ RowsetMetaSharedPtr
TabletMeta::acquire_stale_rs_meta_by_version(const Version&
return nullptr;
}
-void TabletMeta::add_delete_predicate(const DeletePredicatePB&
delete_predicate, int64_t version) {
- for (auto& del_pred : _del_predicates) {
- if (del_pred.version() == version) {
- *del_pred.mutable_sub_predicates() =
delete_predicate.sub_predicates();
- return;
+const std::vector<RowsetMetaSharedPtr> TabletMeta::delete_predicates() const {
+ std::vector<RowsetMetaSharedPtr> res;
+ for (auto& del_pred : _rs_metas) {
+ if (del_pred->has_delete_predicate()) {
+ res.push_back(del_pred);
}
}
- DeletePredicatePB copied_pred = delete_predicate;
- copied_pred.set_version(version);
- _del_predicates.emplace_back(copied_pred);
-}
-
-void TabletMeta::remove_delete_predicate_by_version(const Version& version) {
- DCHECK(version.first == version.second) << "version=" << version;
- int pred_to_del = -1;
- for (int i = 0; i < _del_predicates.size(); ++i) {
- if (_del_predicates[i].version() == version.first) {
- pred_to_del = i;
- // one DeletePredicatePB stands for a nested predicate, such as
user submit a delete predicate a=1 and b=2
- // they could be saved as a one DeletePredicatePB
- break;
- }
- }
- if (pred_to_del > -1) {
- _del_predicates.erase(_del_predicates.begin() + pred_to_del);
- }
-}
-
-const std::vector<DeletePredicatePB>& TabletMeta::delete_predicates() const {
- return _del_predicates;
+ return res;
}
bool TabletMeta::version_for_delete_predicate(const Version& version) {
@@ -758,8 +724,8 @@ bool TabletMeta::version_for_delete_predicate(const
Version& version) {
return false;
}
- for (auto& del_pred : _del_predicates) {
- if (del_pred.version() == version.first) {
+ for (auto& del_pred : _rs_metas) {
+ if (del_pred->version().first == version.first &&
del_pred->has_delete_predicate()) {
return true;
}
}
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index d0f591e354..e42105a40a 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -169,10 +169,7 @@ public:
RowsetMetaSharedPtr acquire_rs_meta_by_version(const Version& version)
const;
void delete_stale_rs_meta_by_version(const Version& version);
RowsetMetaSharedPtr acquire_stale_rs_meta_by_version(const Version&
version) const;
-
- void add_delete_predicate(const DeletePredicatePB& delete_predicate,
int64_t version);
- void remove_delete_predicate_by_version(const Version& version);
- const std::vector<DeletePredicatePB>& delete_predicates() const;
+ const std::vector<RowsetMetaSharedPtr> delete_predicates() const;
bool version_for_delete_predicate(const Version& version);
std::string full_name() const;
@@ -241,8 +238,6 @@ private:
// These stale rowsets meta are been removed when rowsets' pathVersion is
expired,
// this policy is judged and computed by TimestampedVersionTracker.
std::vector<RowsetMetaSharedPtr> _stale_rs_metas;
-
- std::vector<DeletePredicatePB> _del_predicates;
bool _in_restore_mode = false;
RowsetTypePB _preferred_rowset_type = BETA_ROWSET;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 3ae67a34be..b99ce8281d 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -172,8 +172,8 @@ Status NewOlapScanner::_init_tablet_reader_params(
std::copy(function_filters.cbegin(), function_filters.cend(),
std::inserter(_tablet_reader_params.function_filters,
_tablet_reader_params.function_filters.begin()));
-
- std::copy(_tablet->delete_predicates().cbegin(),
_tablet->delete_predicates().cend(),
+ auto& delete_preds = _tablet->delete_predicates();
+ std::copy(delete_preds.cbegin(), delete_preds.cend(),
std::inserter(_tablet_reader_params.delete_predicates,
_tablet_reader_params.delete_predicates.begin()));
diff --git a/be/src/vec/exec/volap_scanner.cpp
b/be/src/vec/exec/volap_scanner.cpp
index 835a3dd59b..efa80b36ac 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -192,15 +192,14 @@ Status VOlapScanner::_init_tablet_reader_params(
std::copy(function_filters.cbegin(), function_filters.cend(),
std::inserter(_tablet_reader_params.function_filters,
_tablet_reader_params.function_filters.begin()));
-
- std::copy(_tablet->delete_predicates().cbegin(),
_tablet->delete_predicates().cend(),
+ auto& delete_preds = _tablet->delete_predicates();
+ std::copy(delete_preds.cbegin(), delete_preds.cend(),
std::inserter(_tablet_reader_params.delete_predicates,
_tablet_reader_params.delete_predicates.begin()));
// Merge the columns in delete predicate that not in latest schema in to
current tablet schema
- for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
- _tablet_schema->merge_dropped_columns(
- _tablet->tablet_schema(Version(del_pred_pb.version(),
del_pred_pb.version())));
+ for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) {
+
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version()));
}
// Range
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp
b/be/test/olap/cumulative_compaction_policy_test.cpp
index deff7a6190..a7117fba43 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -194,7 +194,8 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy,
pick_candidate_rowsets) {
_tablet->calculate_cumulative_point();
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
EXPECT_EQ(2, candidate_rowsets.size());
}
@@ -214,7 +215,8 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy,
pick_input_rowsets_normal) {
NumBasedCumulativeCompactionPolicy policy;
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -243,7 +245,8 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy,
pick_input_rowsets_delete) {
NumBasedCumulativeCompactionPolicy policy;
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -633,7 +636,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
pick_candidate_rowsets) {
_tablet->calculate_cumulative_point();
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
EXPECT_EQ(3, candidate_rowsets.size());
}
@@ -651,7 +655,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
pick_candidate_rowsets_big_base)
_tablet->calculate_cumulative_point();
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
EXPECT_EQ(3, candidate_rowsets.size());
}
@@ -670,7 +675,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
pick_input_rowsets_normal) {
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -699,7 +705,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
pick_input_rowsets_big_base) {
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -728,7 +735,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
pick_input_rowsets_promotion) {
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -757,7 +765,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
pick_input_rowsets_not_same_leve
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -786,7 +795,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
pick_input_rowsets_empty) {
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -815,7 +825,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
pick_input_rowsets_not_reach_min
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -844,7 +855,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
pick_input_rowsets_delete) {
std::vector<RowsetSharedPtr> candidate_rowsets;
-
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
+ std::shared_lock rdlock(_tablet->get_header_lock());
+
_tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets,
rdlock);
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
diff --git a/be/test/olap/delete_handler_test.cpp
b/be/test/olap/delete_handler_test.cpp
index b5a7c9f307..2bf6d74714 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -968,7 +968,7 @@ TEST_F(TestDeleteHandler, InitSuccess) {
add_delete_predicate(del_pred_4, 5);
// Get delete conditions which version <= 5
- res = _delete_handler.init(tablet, tablet->tablet_schema(),
tablet->delete_predicates(), 5);
+ res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 5);
EXPECT_EQ(Status::OK(), res);
_delete_handler.finalize();
}
@@ -1000,7 +1000,7 @@ TEST_F(TestDeleteHandler, FilterDataSubconditions) {
add_delete_predicate(del_pred, 2);
// 指定版本号为10以载入Header中的所有过滤条件(在这个case中,只有过滤条件1)
- res = _delete_handler.init(tablet, tablet->tablet_schema(),
tablet->delete_predicates(), 4);
+ res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 4);
EXPECT_EQ(Status::OK(), res);
// 构造一行测试数据
@@ -1084,7 +1084,7 @@ TEST_F(TestDeleteHandler, FilterDataConditions) {
add_delete_predicate(del_pred_3, 4);
// 指定版本号为4以载入meta中的所有过滤条件(在这个case中,只有过滤条件1)
- res = _delete_handler.init(tablet, tablet->tablet_schema(),
tablet->delete_predicates(), 4);
+ res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 4);
EXPECT_EQ(Status::OK(), res);
std::vector<string> data_str;
@@ -1146,7 +1146,7 @@ TEST_F(TestDeleteHandler, FilterDataVersion) {
add_delete_predicate(del_pred_2, 4);
// 指定版本号为4以载入meta中的所有过滤条件(过滤条件1,过滤条件2)
- res = _delete_handler.init(tablet, tablet->tablet_schema(),
tablet->delete_predicates(), 4);
+ res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 4);
EXPECT_EQ(Status::OK(), res);
// 构造一行测试数据
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 020be00ead..f19228b935 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -149,7 +149,7 @@ enum KeysType {
}
message DeletePredicatePB {
- required int32 version = 1;
+ required int32 version = 1; // This field is useless, but could not
removed, not depend on it
repeated string sub_predicates = 2;
repeated InPredicatePB in_predicates = 3;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]