This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 52f9e03eea [fix](cooldown) Use `pending_remote_rowsets` to avoid
deleting rowset files being uploaded (#16803)
52f9e03eea is described below
commit 52f9e03eea17b3ed3351f6229e3e901d154031ed
Author: plat1ko <[email protected]>
AuthorDate: Tue Feb 21 21:58:20 2023 +0800
[fix](cooldown) Use `pending_remote_rowsets` to avoid deleting rowset
files being uploaded (#16803)
---
be/src/olap/cold_data_compaction.cpp | 2 +-
be/src/olap/compaction.cpp | 62 ++++++++++++++++--------------------
be/src/olap/tablet.cpp | 56 +++++++++++++++++++-------------
be/src/olap/tablet.h | 7 ++--
4 files changed, 67 insertions(+), 60 deletions(-)
diff --git a/be/src/olap/cold_data_compaction.cpp
b/be/src/olap/cold_data_compaction.cpp
index 9f24c9c170..9a92d9eead 100644
--- a/be/src/olap/cold_data_compaction.cpp
+++ b/be/src/olap/cold_data_compaction.cpp
@@ -80,7 +80,7 @@ Status ColdDataCompaction::modify_rowsets() {
// TODO(plat1ko): process primary key
_tablet->tablet_meta()->set_cooldown_meta_id(cooldown_meta_id);
}
-
+
Tablet::erase_pending_remote_rowset(_output_rowset->rowset_id().to_string());
{
std::shared_lock rlock(_tablet->get_header_lock());
_tablet->save_meta();
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index d585516954..ecfc3d8e56 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -272,6 +272,9 @@ Status Compaction::do_compaction_impl(int64_t permits) {
bool vertical_compaction = should_vertical_compaction();
RETURN_NOT_OK(construct_input_rowset_readers());
RETURN_NOT_OK(construct_output_rowset_writer(vertical_compaction));
+ if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
+
Tablet::add_pending_remote_rowset(_output_rs_writer->rowset_id().to_string());
+ }
TRACE("prepare finished");
// 2. write merged rows to output rowset
@@ -282,45 +285,35 @@ Status Compaction::do_compaction_impl(int64_t permits) {
stats.rowid_conversion = &_rowid_conversion;
}
- auto build_output_rowset = [&]() {
- Status res;
- if (use_vectorized_compaction) {
- if (vertical_compaction) {
- res = Merger::vertical_merge_rowsets(_tablet,
compaction_type(), _cur_tablet_schema,
- _input_rs_readers,
_output_rs_writer.get(),
- get_avg_segment_rows(),
&stats);
- } else {
- res = Merger::vmerge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
- _input_rs_readers,
_output_rs_writer.get(), &stats);
- }
+ Status res;
+ if (use_vectorized_compaction) {
+ if (vertical_compaction) {
+ res = Merger::vertical_merge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
+ _input_rs_readers,
_output_rs_writer.get(),
+ get_avg_segment_rows(),
&stats);
} else {
- LOG(FATAL) << "Only support vectorized compaction";
+ res = Merger::vmerge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
+ _input_rs_readers,
_output_rs_writer.get(), &stats);
}
+ } else {
+ LOG(FATAL) << "Only support vectorized compaction";
+ }
- if (!res.ok()) {
- LOG(WARNING) << "fail to do " << merge_type << compaction_name()
<< ". res=" << res
- << ", tablet=" << _tablet->full_name()
- << ", output_version=" << _output_version;
- return res;
- }
- TRACE("merge rowsets finished");
- TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows);
- TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows);
-
- _output_rowset = _output_rs_writer->build();
- if (_output_rowset == nullptr) {
- LOG(WARNING) << "rowset writer build failed. writer version:"
- << ", output_version=" << _output_version;
- return Status::Error<ROWSET_BUILDER_INIT>();
- }
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ".
res=" << res
+ << ", tablet=" << _tablet->full_name()
+ << ", output_version=" << _output_version;
return res;
- };
+ }
+ TRACE("merge rowsets finished");
+ TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows);
+ TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows);
- if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
- std::shared_lock slock(_tablet->get_remote_files_lock());
- RETURN_IF_ERROR(build_output_rowset());
- } else {
- RETURN_IF_ERROR(build_output_rowset());
+ _output_rowset = _output_rs_writer->build();
+ if (_output_rowset == nullptr) {
+ LOG(WARNING) << "rowset writer build failed. writer version:"
+ << ", output_version=" << _output_version;
+ return Status::Error<ROWSET_BUILDER_INIT>();
}
TRACE_COUNTER_INCREMENT("output_rowset_data_size",
_output_rowset->data_disk_size());
@@ -457,6 +450,7 @@ Status Compaction::modify_rowsets() {
void Compaction::gc_output_rowset() {
if (_state != CompactionState::SUCCESS && _output_rowset != nullptr) {
if (!_output_rowset->is_local()) {
+
Tablet::erase_pending_remote_rowset(_output_rowset->rowset_id().to_string());
_tablet->record_unused_remote_rowset(_output_rowset->rowset_id(),
_output_rowset->rowset_meta()->resource_id(),
_output_rowset->num_segments());
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8cbafa7f6e..75c1bff53e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1728,20 +1728,17 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
return Status::InternalError("cannot pick cooldown rowset in tablet
{}", tablet_id());
}
RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
-
- auto start = std::chrono::steady_clock::now();
-
+ add_pending_remote_rowset(new_rowset_id.to_string());
Status st;
- {
- std::shared_lock slock(_remote_files_lock, std::try_to_lock);
- if (!slock.owns_lock()) {
- return Status::Status::Error<TRY_LOCK_FAILED>("try
remote_files_lock failed");
+ Defer defer {[&] {
+ if (!st.ok()) {
+ erase_pending_remote_rowset(new_rowset_id.to_string());
+ // reclaim the incomplete rowset data in remote storage
+ record_unused_remote_rowset(new_rowset_id, dest_fs->id(),
old_rowset->num_segments());
}
- st = old_rowset->upload_to(dest_fs.get(), new_rowset_id);
- }
- if (!st.ok()) {
- // reclaim the incomplete rowset data in remote storage
- record_unused_remote_rowset(new_rowset_id, dest_fs->id(),
old_rowset->num_segments());
+ }};
+ auto start = std::chrono::steady_clock::now();
+ if (st = old_rowset->upload_to(dest_fs.get(), new_rowset_id); !st.ok()) {
return st;
}
@@ -1760,7 +1757,10 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
UniqueId cooldown_meta_id = UniqueId::gen_uid();
// upload cooldowned rowset meta to remote fs
- RETURN_IF_ERROR(write_cooldown_meta(dest_fs, cooldown_meta_id,
new_rowset_meta, {}));
+ st = write_cooldown_meta(dest_fs, cooldown_meta_id, new_rowset_meta, {});
+ if (!st.ok()) {
+ return st;
+ }
RowsetSharedPtr new_rowset;
RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta,
&new_rowset);
@@ -1774,6 +1774,7 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
_tablet_meta->set_cooldown_meta_id(cooldown_meta_id);
}
}
+ erase_pending_remote_rowset(new_rowset_id.to_string());
{
std::unique_lock meta_rlock(_meta_lock);
save_meta();
@@ -2047,6 +2048,18 @@ Status Tablet::remove_all_remote_rowsets() {
gc_pb.SerializeAsString());
}
+static std::unordered_set<std::string> s_pending_remote_rowsets;
+static std::mutex s_pending_remote_rowsets_mtx;
+
+void Tablet::add_pending_remote_rowset(std::string rowset_id) {
+ std::lock_guard lock(s_pending_remote_rowsets_mtx);
+ s_pending_remote_rowsets.insert(std::move(rowset_id));
+}
+void Tablet::erase_pending_remote_rowset(const std::string& rowset_id) {
+ std::lock_guard lock(s_pending_remote_rowsets_mtx);
+ s_pending_remote_rowsets.erase(rowset_id);
+}
+
void Tablet::remove_unused_remote_files() {
auto tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
@@ -2080,16 +2093,9 @@ void Tablet::remove_unused_remote_files() {
Status st;
std::vector<io::Path> files;
- {
- std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
- if (!xlock.owns_lock()) {
- LOG(WARNING) << "try remote_files_lock failed. tablet_id=" <<
t->tablet_id();
- return;
- }
- // FIXME(plat1ko): What if user reset resource in storage policy
to another resource?
- // Maybe we should also list files in previously uploaded
resources.
- st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
- }
+ // FIXME(plat1ko): What if user reset resource in storage policy to
another resource?
+ // Maybe we should also list files in previously uploaded resources.
+ st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
if (!st.ok()) {
LOG(WARNING) << "encounter error when remove unused remote files,
tablet_id="
<< t->tablet_id() << " : " << st;
@@ -2099,6 +2105,10 @@ void Tablet::remove_unused_remote_files() {
}
// get all cooldowned rowsets
std::unordered_set<std::string> cooldowned_rowsets;
+ {
+ std::lock_guard lock(s_pending_remote_rowsets_mtx);
+ cooldowned_rowsets = s_pending_remote_rowsets;
+ }
UniqueId cooldown_meta_id;
{
std::shared_lock rlock(t->_meta_lock);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 82f42dccdd..73a200b383 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -328,7 +328,11 @@ public:
static void remove_unused_remote_files();
- std::shared_mutex& get_remote_files_lock() { return _remote_files_lock; }
+ // If a rowset is to be written to remote filesystem, MUST add it to
`pending_remote_rowsets` before uploading,
+ // and then erase it from `pending_remote_rowsets` after it has been
insert to the Tablet.
+ // `remove_unused_remote_files` MUST NOT delete files of these pending
rowsets.
+ static void add_pending_remote_rowset(std::string rowset_id);
+ static void erase_pending_remote_rowset(const std::string& rowset_id);
uint32_t calc_cold_data_compaction_score() const;
@@ -524,7 +528,6 @@ private:
// cooldown related
int64_t _cooldown_replica_id = -1;
int64_t _cooldown_term = -1;
- std::shared_mutex _remote_files_lock;
std::mutex _cold_compaction_lock;
DISALLOW_COPY_AND_ASSIGN(Tablet);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]