github-actions[bot] commented on code in PR #30045:
URL: https://github.com/apache/doris/pull/30045#discussion_r1454527696
##########
be/src/cloud/cloud_meta_mgr.cpp:
##########
@@ -270,12 +270,12 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id,
TabletMetaSharedPtr* tab
return Status::OK();
}
-Status CloudMetaMgr::sync_tablet_rowsets(Tablet* tablet, bool
warmup_delta_data) {
+Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool
warmup_delta_data) {
Review Comment:
warning: method 'sync_tablet_rowsets' can be made static
[readability-convert-member-functions-to-static]
be/src/cloud/cloud_meta_mgr.h:53:
```diff
- Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data
= false);
+ static Status sync_tablet_rowsets(CloudTablet* tablet, bool
warmup_delta_data = false);
```
##########
be/src/cloud/cloud_meta_mgr.cpp:
##########
@@ -425,15 +425,15 @@
return Status::OK();
}
-Status CloudMetaMgr::update_delete_bitmap(const Tablet* tablet, int64_t
lock_id, int64_t initiator,
- DeleteBitmap* delete_bitmap) {
- VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet->tablet_id();
+Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t
lock_id,
Review Comment:
warning: method 'update_delete_bitmap' can be made static
[readability-convert-member-functions-to-static]
be/src/cloud/cloud_meta_mgr.h:81:
```diff
- Status update_delete_bitmap(const CloudTablet& tablet, int64_t
lock_id, int64_t initiator,
+ static Status update_delete_bitmap(const CloudTablet& tablet, int64_t
lock_id, int64_t initiator,
```
##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -0,0 +1,448 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_tablet.h"
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "io/cache/block/block_file_cache_factory.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_desc.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr
tablet_meta)
+ : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
+
+CloudTablet::~CloudTablet() = default;
+
+bool CloudTablet::exceed_version_limit(int32_t limit) {
+ return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
+}
+
+Status CloudTablet::capture_rs_readers(const Version& spec_version,
+ std::vector<RowSetSplits>* rs_splits,
+ bool skip_missing_version) {
+ Versions version_path;
+ std::shared_lock rlock(_meta_lock);
+ auto st =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
&version_path);
+ if (!st.ok()) {
+ rlock.unlock(); // avoid logging in lock range
+ // Check no missed versions or req version is merged
+ auto missed_versions = calc_missed_versions(spec_version.second);
+ if (missed_versions.empty()) {
+ st.set_code(VERSION_ALREADY_MERGED); // Reset error code
+ }
+ st.append(" tablet_id=" + std::to_string(tablet_id()));
+ // clang-format off
+ LOG(WARNING) << st << '\n' << [this]() { std::string json;
get_compaction_status(&json); return json; }();
+ // clang-format on
+ return st;
+ }
+ VLOG_DEBUG << "capture consitent versions: " << version_path;
+ return capture_rs_readers_unlocked(version_path, rs_splits);
+}
+
+// for example:
+// [0-4][5-5][8-8][9-9][13-13]
+// if spec_version = 12, it will return [6-7],[10-12]
+Versions CloudTablet::calc_missed_versions(int64_t spec_version) {
+ DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+ Versions missed_versions;
+ Versions existing_versions;
+ {
+ std::shared_lock rdlock(_meta_lock);
+ for (const auto& rs : _tablet_meta->all_rs_metas()) {
+ existing_versions.emplace_back(rs->version());
+ }
+ }
+
+ // sort the existing versions in ascending order
+ std::sort(existing_versions.begin(), existing_versions.end(),
+ [](const Version& a, const Version& b) {
+ // simple because 2 versions are certainly not overlapping
+ return a.first < b.first;
+ });
+
+ auto min_version = existing_versions.front().first;
+ if (min_version > 0) {
+ missed_versions.emplace_back(0, std::min(spec_version, min_version -
1));
+ }
+ for (auto it = existing_versions.begin(); it != existing_versions.end() -
1; ++it) {
+ auto prev_v = it->second;
+ if (prev_v >= spec_version) {
+ return missed_versions;
+ }
+ auto next_v = (it + 1)->first;
+ if (next_v > prev_v + 1) {
+ // there is a hole between versions
+ missed_versions.emplace_back(prev_v + 1, std::min(spec_version,
next_v - 1));
+ }
+ }
+ auto max_version = existing_versions.back().second;
+ if (max_version < spec_version) {
+ missed_versions.emplace_back(max_version + 1, spec_version);
+ }
+ return missed_versions;
+}
+
+Status CloudTablet::sync_meta() {
+ // TODO(lightman): FileCache
+ return Status::NotSupported("CloudTablet::sync_meta is not implemented");
+}
+
+// There are only two tablet_states RUNNING and NOT_READY in cloud mode
+// This function will erase the tablet from `CloudTabletMgr` when it can't
find this tablet in MS.
+Status CloudTablet::sync_rowsets(int64_t query_version, bool
warmup_delta_data) {
+ RETURN_IF_ERROR(sync_if_not_running());
+
+ if (query_version > 0) {
+ std::shared_lock rlock(_meta_lock);
+ if (_max_version >= query_version) {
+ return Status::OK();
+ }
+ }
+
+ // serially execute sync to reduce unnecessary network overhead
+ std::lock_guard lock(_sync_meta_lock);
+ if (query_version > 0) {
+ std::shared_lock rlock(_meta_lock);
+ if (_max_version >= query_version) {
+ return Status::OK();
+ }
+ }
+
+ auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data);
+ if (st.is<ErrorCode::NOT_FOUND>()) {
+ recycle_cached_data();
+ }
+ return st;
+}
+
+// Sync tablet meta and all rowset meta if not running.
+// This could happen when BE didn't finish schema change job and another BE
committed this schema change job.
+// It should be a quite rare situation.
+Status CloudTablet::sync_if_not_running() {
+ if (tablet_state() == TABLET_RUNNING) {
+ return Status::OK();
+ }
+
+ // Serially execute sync to reduce unnecessary network overhead
+ std::lock_guard lock(_sync_meta_lock);
+
+ {
+ std::shared_lock rlock(_meta_lock);
+ if (tablet_state() == TABLET_RUNNING) {
+ return Status::OK();
+ }
+ }
+
+ TabletMetaSharedPtr tablet_meta;
+ auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
+ if (!st.ok()) {
+ if (st.is<ErrorCode::NOT_FOUND>()) {
+ recycle_cached_data();
+ }
+ return st;
+ }
+
+ if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
+ // MoW may go to here when load while schema change
+ return Status::Error<INVALID_TABLET_STATE>("invalid tablet state.
tablet_id={}",
+ tablet_id());
+ }
+
+ TimestampedVersionTracker empty_tracker;
+ {
+ std::lock_guard wlock(_meta_lock);
+ RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
+ _rs_version_map.clear();
+ _stale_rs_version_map.clear();
+ std::swap(_timestamped_version_tracker, empty_tracker);
+ _tablet_meta->clear_rowsets();
+ _tablet_meta->clear_stale_rowset();
+ _max_version = -1;
+ }
+
+ st = _engine.meta_mgr().sync_tablet_rowsets(this);
+ if (st.is<ErrorCode::NOT_FOUND>()) {
+ recycle_cached_data();
+ }
+ return st;
+}
+
+void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool
version_overlap,
Review Comment:
warning: function 'add_rowsets' exceeds recommended size/complexity
thresholds [readability-function-size]
```cpp
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool
version_overlap,
^
```
<details>
<summary>Additional context</summary>
**be/src/cloud/cloud_tablet.cpp:190:** 81 lines including whitespace and
comments (threshold 80)
```cpp
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool
version_overlap,
^
```
</details>
##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -0,0 +1,448 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_tablet.h"
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "io/cache/block/block_file_cache_factory.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_desc.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr
tablet_meta)
+ : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
+
+CloudTablet::~CloudTablet() = default;
+
+bool CloudTablet::exceed_version_limit(int32_t limit) {
+ return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
+}
+
+Status CloudTablet::capture_rs_readers(const Version& spec_version,
+ std::vector<RowSetSplits>* rs_splits,
+ bool skip_missing_version) {
+ Versions version_path;
+ std::shared_lock rlock(_meta_lock);
+ auto st =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
&version_path);
+ if (!st.ok()) {
+ rlock.unlock(); // avoid logging in lock range
+ // Check no missed versions or req version is merged
+ auto missed_versions = calc_missed_versions(spec_version.second);
+ if (missed_versions.empty()) {
+ st.set_code(VERSION_ALREADY_MERGED); // Reset error code
+ }
+ st.append(" tablet_id=" + std::to_string(tablet_id()));
+ // clang-format off
+ LOG(WARNING) << st << '\n' << [this]() { std::string json;
get_compaction_status(&json); return json; }();
+ // clang-format on
+ return st;
+ }
+ VLOG_DEBUG << "capture consitent versions: " << version_path;
+ return capture_rs_readers_unlocked(version_path, rs_splits);
+}
+
+// for example:
+// [0-4][5-5][8-8][9-9][13-13]
+// if spec_version = 12, it will return [6-7],[10-12]
+Versions CloudTablet::calc_missed_versions(int64_t spec_version) {
+ DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+ Versions missed_versions;
+ Versions existing_versions;
+ {
+ std::shared_lock rdlock(_meta_lock);
+ for (const auto& rs : _tablet_meta->all_rs_metas()) {
+ existing_versions.emplace_back(rs->version());
+ }
+ }
+
+ // sort the existing versions in ascending order
+ std::sort(existing_versions.begin(), existing_versions.end(),
+ [](const Version& a, const Version& b) {
+ // simple because 2 versions are certainly not overlapping
+ return a.first < b.first;
+ });
+
+ auto min_version = existing_versions.front().first;
+ if (min_version > 0) {
+ missed_versions.emplace_back(0, std::min(spec_version, min_version -
1));
+ }
+ for (auto it = existing_versions.begin(); it != existing_versions.end() -
1; ++it) {
+ auto prev_v = it->second;
+ if (prev_v >= spec_version) {
+ return missed_versions;
+ }
+ auto next_v = (it + 1)->first;
+ if (next_v > prev_v + 1) {
+ // there is a hole between versions
+ missed_versions.emplace_back(prev_v + 1, std::min(spec_version,
next_v - 1));
+ }
+ }
+ auto max_version = existing_versions.back().second;
+ if (max_version < spec_version) {
+ missed_versions.emplace_back(max_version + 1, spec_version);
+ }
+ return missed_versions;
+}
+
+Status CloudTablet::sync_meta() {
+ // TODO(lightman): FileCache
+ return Status::NotSupported("CloudTablet::sync_meta is not implemented");
+}
+
+// There are only two tablet_states RUNNING and NOT_READY in cloud mode
+// This function will erase the tablet from `CloudTabletMgr` when it can't
find this tablet in MS.
+Status CloudTablet::sync_rowsets(int64_t query_version, bool
warmup_delta_data) {
+ RETURN_IF_ERROR(sync_if_not_running());
+
+ if (query_version > 0) {
+ std::shared_lock rlock(_meta_lock);
+ if (_max_version >= query_version) {
+ return Status::OK();
+ }
+ }
+
+ // serially execute sync to reduce unnecessary network overhead
+ std::lock_guard lock(_sync_meta_lock);
+ if (query_version > 0) {
+ std::shared_lock rlock(_meta_lock);
+ if (_max_version >= query_version) {
+ return Status::OK();
+ }
+ }
+
+ auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data);
+ if (st.is<ErrorCode::NOT_FOUND>()) {
+ recycle_cached_data();
+ }
+ return st;
+}
+
+// Sync tablet meta and all rowset meta if not running.
+// This could happen when BE didn't finish schema change job and another BE
committed this schema change job.
+// It should be a quite rare situation.
+Status CloudTablet::sync_if_not_running() {
+ if (tablet_state() == TABLET_RUNNING) {
+ return Status::OK();
+ }
+
+ // Serially execute sync to reduce unnecessary network overhead
+ std::lock_guard lock(_sync_meta_lock);
+
+ {
+ std::shared_lock rlock(_meta_lock);
+ if (tablet_state() == TABLET_RUNNING) {
+ return Status::OK();
+ }
+ }
+
+ TabletMetaSharedPtr tablet_meta;
+ auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
+ if (!st.ok()) {
+ if (st.is<ErrorCode::NOT_FOUND>()) {
+ recycle_cached_data();
+ }
+ return st;
+ }
+
+ if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
+ // MoW may go to here when load while schema change
+ return Status::Error<INVALID_TABLET_STATE>("invalid tablet state.
tablet_id={}",
+ tablet_id());
+ }
+
+ TimestampedVersionTracker empty_tracker;
+ {
+ std::lock_guard wlock(_meta_lock);
+ RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
+ _rs_version_map.clear();
+ _stale_rs_version_map.clear();
+ std::swap(_timestamped_version_tracker, empty_tracker);
+ _tablet_meta->clear_rowsets();
+ _tablet_meta->clear_stale_rowset();
+ _max_version = -1;
+ }
+
+ st = _engine.meta_mgr().sync_tablet_rowsets(this);
+ if (st.is<ErrorCode::NOT_FOUND>()) {
+ recycle_cached_data();
+ }
+ return st;
+}
+
+void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool
version_overlap,
+ bool warmup_delta_data) {
+ if (to_add.empty()) {
+ return;
+ }
+
+ auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>&
add_rowsets) {
+ for (auto& rs : add_rowsets) {
+ _rs_version_map.emplace(rs->version(), rs);
+ _timestamped_version_tracker.add_version(rs->version());
+ _max_version = std::max(rs->end_version(), _max_version);
+ update_base_size(*rs);
+ }
+ _tablet_meta->add_rowsets_unchecked(add_rowsets);
+ // TODO(plat1ko): Warmup delta rowset data in background
+ };
+
+ if (!version_overlap) {
+ add_rowsets_directly(to_add);
+ return;
+ }
+
+ // Filter out existed rowsets
+ auto remove_it =
+ std::remove_if(to_add.begin(), to_add.end(), [this](const
RowsetSharedPtr& rs) {
+ if (auto find_it = _rs_version_map.find(rs->version());
+ find_it == _rs_version_map.end()) {
+ return false;
+ } else if (find_it->second->rowset_id() == rs->rowset_id()) {
+ return true; // Same rowset
+ }
+
+ // If version of rowset in `to_add` is equal to rowset in
tablet but rowset_id is not equal,
+ // replace existed rowset with `to_add` rowset. This may occur
when:
+ // 1. schema change converts rowsets which have been double
written to new tablet
+ // 2. cumu compaction picks single overlapping input rowset
to perform compaction
+ _tablet_meta->delete_rs_meta_by_version(rs->version(),
nullptr);
+ _rs_version_map[rs->version()] = rs;
+ _tablet_meta->add_rowsets_unchecked({rs});
+ update_base_size(*rs);
+ return true;
+ });
+
+ to_add.erase(remove_it, to_add.end());
+
+ // delete rowsets with overlapped version
+ std::vector<RowsetSharedPtr> to_add_directly;
+ std::vector<RowsetSharedPtr> to_delete;
+ for (auto& to_add_rs : to_add) {
+ // delete rowsets with overlapped version
+ std::vector<RowsetSharedPtr> to_delete;
+ Version to_add_v = to_add_rs->version();
+ // if start_version > max_version, we can skip checking overlap here.
+ if (to_add_v.first > _max_version) {
+ // if start_version > max_version, we can skip checking overlap
here.
+ to_add_directly.push_back(to_add_rs);
+ } else if (to_add_v.second > _max_version) {
+ // if start_version <= max_version and end_version > max_version,
+ // we need to add it directly
+ to_add_directly.push_back(to_add_rs);
+ for (auto& [v, rs] : _rs_version_map) {
+ if (to_add_v.contains(v)) {
+ to_delete.push_back(rs);
+ }
+ }
+ delete_rowsets(to_delete);
+ continue;
+ } else {
+ to_add_directly.push_back(to_add_rs);
+ for (auto& [v, rs] : _rs_version_map) {
+ if (to_add_v.contains(v)) {
+ to_delete.push_back(rs);
+ }
+ }
+ delete_rowsets(to_delete);
+ continue;
+ // TODO(liuchangliang)
+ // _engine.tablet_mgr()->sumbit_overlap_rowsets(tablet_id(),
to_add_rs, std::move(to_delete));
+ }
+ }
+
+ add_rowsets_directly(to_add_directly);
+}
+
+void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>&
to_delete) {
Review Comment:
warning: method 'delete_rowsets' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>&
to_delete) {
```
##########
be/src/olap/base_tablet.cpp:
##########
@@ -79,4 +81,42 @@
return Status::OK();
}
+Status BaseTablet::capture_rs_readers_unlocked(const std::vector<Version>&
version_path,
+ std::vector<RowSetSplits>*
rs_splits) const {
+ DCHECK(rs_splits != nullptr && rs_splits->empty());
+ for (auto version : version_path) {
+ auto it = _rs_version_map.find(version);
+ if (it == _rs_version_map.end()) {
+ VLOG_NOTICE << "fail to find Rowset in rs_version for version.
tablet=" << tablet_id()
+ << ", version='" << version.first << "-" <<
version.second;
+
+ it = _stale_rs_version_map.find(version);
+ if (it == _stale_rs_version_map.end()) {
+ return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+ "fail to find Rowset in stale_rs_version for version.
tablet={}, "
+ "version={}-{}",
+ tablet_id(), version.first, version.second);
+ }
+ }
+ RowsetReaderSharedPtr rs_reader;
+ auto res = it->second->create_reader(&rs_reader);
+ if (!res.ok()) {
+ return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+ "failed to create reader for rowset:{}",
it->second->rowset_id().to_string());
+ }
+ rs_splits->push_back(RowSetSplits(std::move(rs_reader)));
+ }
+ return Status::OK();
+}
+
+bool BaseTablet::_reconstruct_version_tracker_if_necessary() {
Review Comment:
warning: method '_reconstruct_version_tracker_if_necessary' can be made
static [readability-convert-member-functions-to-static]
be/src/olap/base_tablet.h:91:
```diff
- bool _reconstruct_version_tracker_if_necessary();
+ static bool _reconstruct_version_tracker_if_necessary();
```
##########
be/src/olap/base_tablet.cpp:
##########
@@ -79,4 +81,42 @@
return Status::OK();
}
+Status BaseTablet::capture_rs_readers_unlocked(const std::vector<Version>&
version_path,
+ std::vector<RowSetSplits>*
rs_splits) const {
+ DCHECK(rs_splits != nullptr && rs_splits->empty());
+ for (auto version : version_path) {
+ auto it = _rs_version_map.find(version);
+ if (it == _rs_version_map.end()) {
+ VLOG_NOTICE << "fail to find Rowset in rs_version for version.
tablet=" << tablet_id()
+ << ", version='" << version.first << "-" <<
version.second;
+
+ it = _stale_rs_version_map.find(version);
+ if (it == _stale_rs_version_map.end()) {
+ return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+ "fail to find Rowset in stale_rs_version for version.
tablet={}, "
+ "version={}-{}",
+ tablet_id(), version.first, version.second);
+ }
+ }
+ RowsetReaderSharedPtr rs_reader;
+ auto res = it->second->create_reader(&rs_reader);
+ if (!res.ok()) {
+ return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+ "failed to create reader for rowset:{}",
it->second->rowset_id().to_string());
+ }
+ rs_splits->push_back(RowSetSplits(std::move(rs_reader)));
+ }
+ return Status::OK();
+}
+
+bool BaseTablet::_reconstruct_version_tracker_if_necessary() {
+ double orphan_vertex_ratio =
_timestamped_version_tracker.get_orphan_vertex_ratio();
+ if (orphan_vertex_ratio >=
config::tablet_version_graph_orphan_vertex_ratio) {
+ _timestamped_version_tracker.construct_versioned_tracker(
+ _tablet_meta->all_rs_metas(),
_tablet_meta->all_stale_rs_metas());
+ return true;
Review Comment:
warning: redundant boolean literal in conditional return statement
[readability-simplify-boolean-expr]
be/src/olap/base_tablet.cpp:113:
```diff
- if (orphan_vertex_ratio >=
config::tablet_version_graph_orphan_vertex_ratio) {
- _timestamped_version_tracker.construct_versioned_tracker(
- _tablet_meta->all_rs_metas(),
_tablet_meta->all_stale_rs_metas());
- return true;
- }
- return false;
+ return orphan_vertex_ratio >=
config::tablet_version_graph_orphan_vertex_ratio;
```
##########
be/src/cloud/cloud_meta_mgr.cpp:
##########
@@ -451,18 +451,18 @@
if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"lock expired when update delete bitmap, tablet_id: {},
lock_id: {}",
- tablet->tablet_id(), lock_id);
+ tablet.tablet_id(), lock_id);
}
return st;
}
-Status CloudMetaMgr::get_delete_bitmap_update_lock(const Tablet* tablet,
int64_t lock_id,
+Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet,
int64_t lock_id,
Review Comment:
warning: method 'get_delete_bitmap_update_lock' can be made static
[readability-convert-member-functions-to-static]
be/src/cloud/cloud_meta_mgr.h:84:
```diff
- Status get_delete_bitmap_update_lock(const CloudTablet& tablet,
int64_t lock_id,
+ static Status get_delete_bitmap_update_lock(const CloudTablet& tablet,
int64_t lock_id,
```
##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -0,0 +1,448 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_tablet.h"
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "io/cache/block/block_file_cache_factory.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_desc.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr
tablet_meta)
+ : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
+
+CloudTablet::~CloudTablet() = default;
+
+bool CloudTablet::exceed_version_limit(int32_t limit) {
+ return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
+}
+
+Status CloudTablet::capture_rs_readers(const Version& spec_version,
+ std::vector<RowSetSplits>* rs_splits,
+ bool skip_missing_version) {
+ Versions version_path;
+ std::shared_lock rlock(_meta_lock);
+ auto st =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
&version_path);
+ if (!st.ok()) {
+ rlock.unlock(); // avoid logging in lock range
+ // Check no missed versions or req version is merged
+ auto missed_versions = calc_missed_versions(spec_version.second);
+ if (missed_versions.empty()) {
+ st.set_code(VERSION_ALREADY_MERGED); // Reset error code
+ }
+ st.append(" tablet_id=" + std::to_string(tablet_id()));
+ // clang-format off
+ LOG(WARNING) << st << '\n' << [this]() { std::string json;
get_compaction_status(&json); return json; }();
+ // clang-format on
+ return st;
+ }
+ VLOG_DEBUG << "capture consitent versions: " << version_path;
+ return capture_rs_readers_unlocked(version_path, rs_splits);
+}
+
+// for example:
+// [0-4][5-5][8-8][9-9][13-13]
+// if spec_version = 12, it will return [6-7],[10-12]
+Versions CloudTablet::calc_missed_versions(int64_t spec_version) {
+ DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+ Versions missed_versions;
+ Versions existing_versions;
+ {
+ std::shared_lock rdlock(_meta_lock);
+ for (const auto& rs : _tablet_meta->all_rs_metas()) {
+ existing_versions.emplace_back(rs->version());
+ }
+ }
+
+ // sort the existing versions in ascending order
+ std::sort(existing_versions.begin(), existing_versions.end(),
+ [](const Version& a, const Version& b) {
+ // simple because 2 versions are certainly not overlapping
+ return a.first < b.first;
+ });
+
+ auto min_version = existing_versions.front().first;
+ if (min_version > 0) {
+ missed_versions.emplace_back(0, std::min(spec_version, min_version -
1));
+ }
+ for (auto it = existing_versions.begin(); it != existing_versions.end() -
1; ++it) {
+ auto prev_v = it->second;
+ if (prev_v >= spec_version) {
+ return missed_versions;
+ }
+ auto next_v = (it + 1)->first;
+ if (next_v > prev_v + 1) {
+ // there is a hole between versions
+ missed_versions.emplace_back(prev_v + 1, std::min(spec_version,
next_v - 1));
+ }
+ }
+ auto max_version = existing_versions.back().second;
+ if (max_version < spec_version) {
+ missed_versions.emplace_back(max_version + 1, spec_version);
+ }
+ return missed_versions;
+}
+
+Status CloudTablet::sync_meta() {
Review Comment:
warning: method 'sync_meta' can be made static
[readability-convert-member-functions-to-static]
be/src/cloud/cloud_tablet.h:70:
```diff
- Status sync_meta();
+ static Status sync_meta();
```
##########
be/src/olap/base_tablet.cpp:
##########
@@ -79,4 +81,42 @@ Status BaseTablet::update_by_least_common_schema(const
TabletSchemaSPtr& update_
return Status::OK();
}
+Status BaseTablet::capture_rs_readers_unlocked(const std::vector<Version>&
version_path,
+ std::vector<RowSetSplits>*
rs_splits) const {
Review Comment:
warning: method 'capture_rs_readers_unlocked' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status BaseTablet::capture_rs_readers_unlocked(const
std::vector<Version>& version_path,
std::vector<RowSetSplits>*
rs_splits) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]