This is an automated email from the ASF dual-hosted git repository.
pengxiangyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6a1e3d3435 [fix](cooldown)Fix bug for single cooldown compaction, add
remote meta (#16812)
6a1e3d3435 is described below
commit 6a1e3d3435ca95abbe7a62ba071a909041dba60d
Author: pengxiangyu <[email protected]>
AuthorDate: Fri Feb 17 15:13:06 2023 +0800
[fix](cooldown)Fix bug for single cooldown compaction, add remote meta
(#16812)
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
* fix bug, add remote meta for compaction
---
be/src/olap/cold_data_compaction.cpp | 15 ++++-
be/src/olap/storage_policy.cpp | 18 ++++++
be/src/olap/storage_policy.h | 3 +
be/src/olap/tablet.cpp | 71 ++++++++++++++--------
be/src/olap/tablet.h | 13 ++--
.../apache/doris/catalog/TabletInvertedIndex.java | 5 +-
6 files changed, 88 insertions(+), 37 deletions(-)
diff --git a/be/src/olap/cold_data_compaction.cpp
b/be/src/olap/cold_data_compaction.cpp
index 4b06ee7616..9f24c9c170 100644
--- a/be/src/olap/cold_data_compaction.cpp
+++ b/be/src/olap/cold_data_compaction.cpp
@@ -19,6 +19,7 @@
#include "common/compiler_util.h"
#include "olap/compaction.h"
+#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
namespace doris {
@@ -60,14 +61,26 @@ Status ColdDataCompaction::pick_rowsets_to_compact() {
}
Status ColdDataCompaction::modify_rowsets() {
+ UniqueId cooldown_meta_id = UniqueId::gen_uid();
+
+ // write remote tablet meta
+ std::shared_ptr<io::RemoteFileSystem> fs;
+ RETURN_IF_ERROR(get_remote_file_system(_tablet->storage_policy_id(), &fs));
+ std::vector<RowsetMetaSharedPtr> to_deletes;
+ for (auto& rs : _input_rowsets) {
+ to_deletes.emplace_back(rs->rowset_meta());
+ }
+ RETURN_IF_ERROR(_tablet->write_cooldown_meta(fs, cooldown_meta_id,
+
_output_rowset->rowset_meta(), to_deletes));
{
std::lock_guard wlock(_tablet->get_header_lock());
// Merged cooldowned rowsets MUST NOT be managed by version graph,
they will be reclaimed by `remove_unused_remote_files`.
_tablet->delete_rowsets(_input_rowsets, false);
_tablet->add_rowsets({_output_rowset});
// TODO(plat1ko): process primary key
- _tablet->tablet_meta()->set_cooldown_meta_id(UniqueId::gen_uid());
+ _tablet->tablet_meta()->set_cooldown_meta_id(cooldown_meta_id);
}
+
{
std::shared_lock rlock(_tablet->get_header_lock());
_tablet->save_meta();
diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp
index c227534907..68f561397d 100644
--- a/be/src/olap/storage_policy.cpp
+++ b/be/src/olap/storage_policy.cpp
@@ -31,6 +31,24 @@ struct StoragePolicyMgr {
static StoragePolicyMgr s_storage_policy_mgr;
+Status get_remote_file_system(int64_t storage_policy_id,
+ std::shared_ptr<io::RemoteFileSystem>* fs) {
+ auto storage_policy = get_storage_policy(storage_policy_id);
+ if (storage_policy == nullptr) {
+ return Status::InternalError("could not find storage_policy,
storage_policy_id={}",
+ storage_policy_id);
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ *fs = std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (*fs == nullptr) {
+ return Status::InternalError("could not find resource, resouce_id={}",
+ storage_policy->resource_id);
+ }
+ DCHECK(atol((*fs)->id().c_str()) == storage_policy->resource_id);
+ DCHECK((*fs)->type() != io::FileSystemType::LOCAL);
+ return Status::OK();
+}
+
StoragePolicyPtr get_storage_policy(int64_t id) {
std::lock_guard lock(s_storage_policy_mgr.mtx);
if (auto it = s_storage_policy_mgr.map.find(id); it !=
s_storage_policy_mgr.map.end()) {
diff --git a/be/src/olap/storage_policy.h b/be/src/olap/storage_policy.h
index 4ddac3d36f..02d86603fa 100644
--- a/be/src/olap/storage_policy.h
+++ b/be/src/olap/storage_policy.h
@@ -18,6 +18,7 @@
#pragma once
#include "io/fs/file_system.h"
+#include "io/fs/remote_file_system.h"
namespace doris {
@@ -37,6 +38,8 @@ struct StoragePolicy {
using StoragePolicyPtr = std::shared_ptr<StoragePolicy>;
+Status get_remote_file_system(int64_t storage_policy_id,
std::shared_ptr<io::RemoteFileSystem>* fs);
+
// return nullptr if not found
StoragePolicyPtr get_storage_policy(int64_t id);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index f0ea777713..2a2ab4acf9 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -38,6 +38,7 @@
#include <set>
#include <shared_mutex>
#include <string>
+#include <unordered_set>
#include "agent/utils.h"
#include "common/config.h"
@@ -1709,23 +1710,14 @@ Status Tablet::cooldown() {
if (cooldown_replica_id <= 0) { // wait for FE to push cooldown conf
return Status::InternalError("invalid cooldown_replica_id");
}
- auto storage_policy = get_storage_policy(storage_policy_id());
- if (storage_policy == nullptr) {
- return Status::InternalError("could not find storage_policy,
storage_policy_id={}",
- storage_policy_id());
- }
- auto resource = get_storage_resource(storage_policy->resource_id);
- auto dest_fs = std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
- if (dest_fs == nullptr) {
- return Status::InternalError("could not find resource, resouce_id={}",
- storage_policy->resource_id);
- }
- DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
- DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ std::shared_ptr<io::RemoteFileSystem> fs;
+ RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
+
if (cooldown_replica_id == replica_id()) {
- RETURN_IF_ERROR(_cooldown_data(dest_fs));
+ RETURN_IF_ERROR(_cooldown_data(fs));
} else {
- RETURN_IF_ERROR(_follow_cooldowned_data(dest_fs.get(),
cooldown_replica_id));
+ RETURN_IF_ERROR(_follow_cooldowned_data(fs, cooldown_replica_id));
}
return Status::OK();
}
@@ -1768,7 +1760,7 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
UniqueId cooldown_meta_id = UniqueId::gen_uid();
// upload cooldowned rowset meta to remote fs
- RETURN_IF_ERROR(_write_cooldown_meta(dest_fs.get(), cooldown_meta_id,
new_rowset_meta.get()));
+ RETURN_IF_ERROR(write_cooldown_meta(dest_fs, cooldown_meta_id,
new_rowset_meta, {}));
RowsetSharedPtr new_rowset;
RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta,
&new_rowset);
@@ -1789,8 +1781,8 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
return Status::OK();
}
-Status Tablet::_read_cooldown_meta(io::RemoteFileSystem* fs, int64_t
cooldown_replica_id,
- TabletMetaPB* tablet_meta_pb) {
+Status Tablet::_read_cooldown_meta(const
std::shared_ptr<io::RemoteFileSystem>& fs,
+ int64_t cooldown_replica_id, TabletMetaPB*
tablet_meta_pb) {
std::string remote_meta_path =
BetaRowset::remote_tablet_meta_path(tablet_id(),
cooldown_replica_id);
IOContext io_ctx;
@@ -1807,29 +1799,53 @@ Status
Tablet::_read_cooldown_meta(io::RemoteFileSystem* fs, int64_t cooldown_re
return Status::OK();
}
-Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId
cooldown_meta_id,
- RowsetMeta* new_rs_meta) {
+Status Tablet::write_cooldown_meta(const
std::shared_ptr<io::RemoteFileSystem>& fs,
+ UniqueId cooldown_meta_id,
+ const RowsetMetaSharedPtr& new_rs_meta,
+ const std::vector<RowsetMetaSharedPtr>&
to_deletes) {
+ std::unordered_set<Version, HashOfVersion> to_delete_set;
+ for (auto& rs_meta : to_deletes) {
+ to_delete_set.emplace(rs_meta->version());
+ }
+
std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas;
{
std::shared_lock meta_rlock(_meta_lock);
for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
if (!rs_meta->is_local()) {
- cooldowned_rs_metas.push_back(rs_meta);
+ if (to_delete_set.find(rs_meta->version()) !=
to_delete_set.end()) {
+ continue;
+ }
+ cooldowned_rs_metas.emplace_back(rs_meta);
}
}
}
+ cooldowned_rs_metas.emplace_back(new_rs_meta);
std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(),
RowsetMeta::comparator);
- if (UNLIKELY(!cooldowned_rs_metas.empty() &&
- new_rs_meta->start_version() !=
cooldowned_rs_metas.back()->end_version() + 1)) {
- return Status::InternalError("version not continuous");
+
+ // check_version_continuity
+ if (!cooldowned_rs_metas.empty()) {
+ RowsetMetaSharedPtr prev_rowset_meta = cooldowned_rs_metas.front();
+ for (size_t i = 1; i < cooldowned_rs_metas.size(); ++i) {
+ RowsetMetaSharedPtr rowset_meta = cooldowned_rs_metas[i];
+ if (rowset_meta->start_version() !=
prev_rowset_meta->end_version() + 1) {
+ LOG(WARNING) << "There are missed versions among rowsets. "
+ << "prev_rowset_meta version=" <<
prev_rowset_meta->start_version()
+ << "-" << prev_rowset_meta->end_version()
+ << ", rowset_meta version=" <<
rowset_meta->start_version() << "-"
+ << rowset_meta->end_version();
+ return Status::Error<CUMULATIVE_MISS_VERSION>();
+ }
+ prev_rowset_meta = rowset_meta;
+ }
}
+
TabletMetaPB tablet_meta_pb;
auto rs_metas = tablet_meta_pb.mutable_rs_metas();
- rs_metas->Reserve(cooldowned_rs_metas.size() + 1);
+ rs_metas->Reserve(cooldowned_rs_metas.size());
for (auto& rs_meta : cooldowned_rs_metas) {
rs_metas->Add(rs_meta->get_rowset_pb());
}
- rs_metas->Add(new_rs_meta->get_rowset_pb());
tablet_meta_pb.mutable_cooldown_meta_id()->set_hi(cooldown_meta_id.hi);
tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo);
@@ -1842,7 +1858,8 @@ Status Tablet::_write_cooldown_meta(io::RemoteFileSystem*
fs, UniqueId cooldown_
return tablet_meta_writer->close();
}
-Status Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t
cooldown_replica_id) {
+Status Tablet::_follow_cooldowned_data(const
std::shared_ptr<io::RemoteFileSystem>& fs,
+ int64_t cooldown_replica_id) {
LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id()
<< " cooldown_replica_id=" << cooldown_replica_id
<< " local replica=" << replica_id();
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index ed8ada05c7..adcc6368f8 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -391,6 +391,10 @@ public:
}
}
+ Status write_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
+ UniqueId cooldown_meta_id, const
RowsetMetaSharedPtr& new_rs_meta,
+ const std::vector<RowsetMetaSharedPtr>&
to_deletes);
+
private:
Status _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions)
const;
@@ -431,11 +435,10 @@ private:
// begin cooldown functions
////////////////////////////////////////////////////////////////////////////
Status _cooldown_data(const std::shared_ptr<io::RemoteFileSystem>&
dest_fs);
- Status _follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t
cooldown_replica_id);
- Status _read_cooldown_meta(io::RemoteFileSystem* fs, int64_t
cooldown_replica_id,
- TabletMetaPB* tablet_meta_pb);
- Status _write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId
cooldown_meta_id,
- RowsetMeta* new_rs_meta);
+ Status _follow_cooldowned_data(const
std::shared_ptr<io::RemoteFileSystem>& fs,
+ int64_t cooldown_replica_id);
+ Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
+ int64_t cooldown_replica_id, TabletMetaPB*
tablet_meta_pb);
////////////////////////////////////////////////////////////////////////////
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 8de447050c..7fe4556dec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -190,7 +190,7 @@ public class TabletInvertedIndex {
}
}
- if (Config.enable_storage_policy) {
+ if (Config.enable_storage_policy &&
backendTabletInfo.isSetCooldownReplicaId()) {
handleCooldownConf(tabletMeta,
backendTabletInfo, cooldownConfToPush,
cooldownConfToUpdate);
replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
@@ -340,9 +340,6 @@ public class TabletInvertedIndex {
private void handleCooldownConf(TabletMeta tabletMeta, TTabletInfo
beTabletInfo,
List<CooldownConf> cooldownConfToPush, List<CooldownConf>
cooldownConfToUpdate) {
- if (!beTabletInfo.isSetCooldownReplicaId()) {
- return;
- }
Tablet tablet;
try {
OlapTable table = (OlapTable)
Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]