This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 789aa993442 [improve] (http api) Support calculating file information
in the cloud(#37880) (#38626)
789aa993442 is described below
commit 789aa993442c6c012d2886219f7e72f5c640a419
Author: Sun Chenyang <[email protected]>
AuthorDate: Thu Aug 1 10:36:51 2024 +0800
[improve] (http api) Support calculating file information in the
cloud(#37880) (#38626)
## Proposed changes
pick from master #37880
Issue Number: close #xxx
---
be/src/cloud/cloud_tablet.h | 12 -----
be/src/http/action/calc_file_crc_action.cpp | 25 ++++++++---
be/src/http/action/calc_file_crc_action.h | 6 +--
be/src/olap/base_tablet.cpp | 32 ++++++++++++++
be/src/olap/base_tablet.h | 15 +++++++
be/src/olap/rowset/beta_rowset.cpp | 51 ++++++++++------------
be/src/olap/rowset/beta_rowset.h | 2 +-
be/src/olap/tablet.cpp | 32 --------------
be/src/olap/tablet.h | 14 ------
be/src/service/http_service.cpp | 4 ++
.../test_calc_crc_fault_injection.groovy | 11 ++---
.../test_schema_change_storage_format.groovy | 3 --
.../test_variant_index_format_v1.groovy | 3 --
13 files changed, 101 insertions(+), 109 deletions(-)
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 2e6938444d1..10ff1835e6c 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -147,18 +147,6 @@ public:
std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
- void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
- bool include_stale = false) {
- std::shared_lock rlock(_meta_lock);
- for (auto& [v, rs] : _rs_version_map) {
- visitor(rs);
- }
- if (!include_stale) return;
- for (auto& [v, rs] : _stale_rs_version_map) {
- visitor(rs);
- }
- }
-
inline Version max_version() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->max_version();
diff --git a/be/src/http/action/calc_file_crc_action.cpp
b/be/src/http/action/calc_file_crc_action.cpp
index c713184ddfd..66ec96a2a9a 100644
--- a/be/src/http/action/calc_file_crc_action.cpp
+++ b/be/src/http/action/calc_file_crc_action.cpp
@@ -25,6 +25,7 @@
#include <exception>
#include <string>
+#include "cloud/cloud_storage_engine.h"
#include "common/logging.h"
#include "common/status.h"
#include "http/http_channel.h"
@@ -38,7 +39,7 @@
namespace doris {
using namespace ErrorCode;
-CalcFileCrcAction::CalcFileCrcAction(ExecEnv* exec_env, StorageEngine& engine,
+CalcFileCrcAction::CalcFileCrcAction(ExecEnv* exec_env, BaseStorageEngine&
engine,
TPrivilegeHier::type hier,
TPrivilegeType::type ptype)
: HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine) {}
@@ -58,16 +59,28 @@ Status CalcFileCrcAction::_handle_calc_crc(HttpRequest*
req, uint32_t* crc_value
return Status::InternalError("convert tablet id or failed, {}",
e.what());
}
- TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
+ BaseTabletSPtr tablet = nullptr;
+
+ if (auto cloudEngine = dynamic_cast<CloudStorageEngine*>(&_engine)) {
+ tablet = DORIS_TRY(cloudEngine->get_tablet(tablet_id));
+ // sync all rowsets
+
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(-1));
+ } else if (auto storageEngine = dynamic_cast<StorageEngine*>(&_engine)) {
+ auto tabletPtr =
storageEngine->tablet_manager()->get_tablet(tablet_id);
+ tablet = std::dynamic_pointer_cast<Tablet>(tabletPtr);
+ } else {
+ return Status::InternalError("convert _engine failed");
+ }
+
if (tablet == nullptr) {
- return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
+ return Status::NotFound("failed to get tablet {}", tablet_id);
}
const auto& req_start_version = req->param(PARAM_START_VERSION);
const auto& req_end_version = req->param(PARAM_END_VERSION);
*start_version = 0;
- *end_version = tablet->max_version().second;
+ *end_version = tablet->max_version_unlocked();
if (!req_start_version.empty()) {
try {
@@ -85,8 +98,8 @@ Status CalcFileCrcAction::_handle_calc_crc(HttpRequest* req,
uint32_t* crc_value
}
}
- auto st = tablet->calc_local_file_crc(crc_value, *start_version,
*end_version, rowset_count,
- file_count);
+ auto st = tablet->calc_file_crc(crc_value, *start_version, *end_version,
rowset_count,
+ file_count);
if (!st.ok()) {
return st;
}
diff --git a/be/src/http/action/calc_file_crc_action.h
b/be/src/http/action/calc_file_crc_action.h
index 2c0d19f0ca0..30df8bfe629 100644
--- a/be/src/http/action/calc_file_crc_action.h
+++ b/be/src/http/action/calc_file_crc_action.h
@@ -26,7 +26,7 @@
namespace doris {
class HttpRequest;
-class StorageEngine;
+class BaseStorageEngine;
class ExecEnv;
const std::string PARAM_START_VERSION = "start_version";
@@ -35,7 +35,7 @@ const std::string PARAM_END_VERSION = "end_version";
// This action is used to calculate the crc value of the files in the tablet.
class CalcFileCrcAction : public HttpHandlerWithAuth {
public:
- CalcFileCrcAction(ExecEnv* exec_env, StorageEngine& engine,
TPrivilegeHier::type hier,
+ CalcFileCrcAction(ExecEnv* exec_env, BaseStorageEngine& engine,
TPrivilegeHier::type hier,
TPrivilegeType::type ptype);
~CalcFileCrcAction() override = default;
@@ -47,7 +47,7 @@ private:
int64_t* end_version, int32_t* rowset_count,
int64_t* file_count);
private:
- StorageEngine& _engine;
+ BaseStorageEngine& _engine;
};
} // end namespace doris
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 71ece1599d6..c4330667dfc 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -33,6 +33,7 @@
#include "olap/txn_manager.h"
#include "service/point_query_executor.h"
#include "util/bvar_helper.h"
+#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "vec/common/schema_util.h"
@@ -1555,4 +1556,35 @@ void BaseTablet::calc_consecutive_empty_rowsets(
}
}
+Status BaseTablet::calc_file_crc(uint32_t* crc_value, int64_t start_version,
int64_t end_version,
+ int32_t* rowset_count, int64_t* file_count) {
+ Version v(start_version, end_version);
+ std::vector<RowsetSharedPtr> rowsets;
+ traverse_rowsets([&rowsets, &v](const auto& rs) {
+ // get all rowsets
+ if (v.contains(rs->version())) {
+ rowsets.emplace_back(rs);
+ }
+ });
+ std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
+ *rowset_count = rowsets.size();
+
+ *crc_value = 0;
+ *file_count = 0;
+ for (const auto& rs : rowsets) {
+ uint32_t rs_crc_value = 0;
+ int64_t rs_file_count = 0;
+ auto rowset = std::static_pointer_cast<BetaRowset>(rs);
+ auto st = rowset->calc_file_crc(&rs_crc_value, &rs_file_count);
+ if (!st.ok()) {
+ return st;
+ }
+ // crc_value is calculated based on the crc_value of each rowset.
+ *crc_value = crc32c::Extend(*crc_value, reinterpret_cast<const
char*>(&rs_crc_value),
+ sizeof(rs_crc_value));
+ *file_count += rs_file_count;
+ }
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index cefb31ccd11..fc75b5e31fd 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -256,6 +256,21 @@ public:
// Return the merged schema of all rowsets
virtual TabletSchemaSPtr merged_tablet_schema() const { return
_max_version_schema; }
+ void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
+ bool include_stale = false) {
+ std::shared_lock rlock(_meta_lock);
+ for (auto& [v, rs] : _rs_version_map) {
+ visitor(rs);
+ }
+ if (!include_stale) return;
+ for (auto& [v, rs] : _stale_rs_version_map) {
+ visitor(rs);
+ }
+ }
+
+ Status calc_file_crc(uint32_t* crc_value, int64_t start_version, int64_t
end_version,
+ int32_t* rowset_count, int64_t* file_count);
+
protected:
// Find the missed versions until the spec_version.
//
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index d16c1146142..992d437da4e 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -636,54 +636,51 @@ Status BetaRowset::add_to_binlog() {
return Status::OK();
}
-Status BetaRowset::calc_local_file_crc(uint32_t* crc_value, int64_t*
file_count) {
- if (!is_local()) {
- DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id();
- return Status::OK();
- }
-
+Status BetaRowset::calc_file_crc(uint32_t* crc_value, int64_t* file_count) {
+ const auto& fs = _rowset_meta->fs();
+ DBUG_EXECUTE_IF("fault_inject::BetaRowset::calc_file_crc",
+ { return Status::Error<OS_ERROR>("fault_inject
calc_file_crc error"); });
if (num_segments() < 1) {
*crc_value = 0x92a8fc17; // magic code from crc32c table
return Status::OK();
}
// 1. pick up all the files including dat file and idx file
- std::vector<io::Path> local_paths;
- for (int i = 0; i < num_segments(); ++i) {
- auto local_seg_path = local_segment_path(_tablet_path,
rowset_id().to_string(), i);
- local_paths.emplace_back(local_seg_path);
+ std::vector<io::Path> file_paths;
+ for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
+ auto seg_path = DORIS_TRY(segment_path(seg_id));
+ file_paths.emplace_back(seg_path);
if (_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
for (auto& column : _schema->columns()) {
const TabletIndex* index_meta =
_schema->get_inverted_index(*column);
if (index_meta) {
- std::string local_inverted_index_file =
+ std::string inverted_index_file =
InvertedIndexDescriptor::get_index_file_path_v1(
-
InvertedIndexDescriptor::get_index_file_path_prefix(
- local_seg_path),
+
InvertedIndexDescriptor::get_index_file_path_prefix(seg_path),
index_meta->index_id(),
index_meta->get_index_suffix());
-
local_paths.emplace_back(std::move(local_inverted_index_file));
+ file_paths.emplace_back(std::move(inverted_index_file));
}
}
} else {
if (_schema->has_inverted_index()) {
- std::string local_inverted_index_file =
- InvertedIndexDescriptor::get_index_file_path_v2(
-
InvertedIndexDescriptor::get_index_file_path_prefix(
- local_seg_path));
- local_paths.emplace_back(std::move(local_inverted_index_file));
+ std::string inverted_index_file =
InvertedIndexDescriptor::get_index_file_path_v2(
+
InvertedIndexDescriptor::get_index_file_path_prefix(seg_path));
+ file_paths.emplace_back(std::move(inverted_index_file));
}
}
}
+ *crc_value = 0;
+ *file_count = file_paths.size();
+ if (!is_local()) {
+ return Status::OK();
+ }
// 2. calculate the md5sum of each file
const auto& local_fs = io::global_local_filesystem();
- DCHECK(!local_paths.empty());
+ DCHECK(!file_paths.empty());
std::vector<std::string> all_file_md5;
- all_file_md5.reserve(local_paths.size());
- for (const auto& file_path : local_paths) {
- DBUG_EXECUTE_IF("fault_inject::BetaRowset::calc_local_file_crc", {
- return Status::Error<OS_ERROR>("fault_inject calc_local_file_crc
error");
- });
+ all_file_md5.reserve(file_paths.size());
+ for (const auto& file_path : file_paths) {
std::string file_md5sum;
auto status = local_fs->md5sum(file_path, &file_md5sum);
if (!status.ok()) {
@@ -696,9 +693,7 @@ Status BetaRowset::calc_local_file_crc(uint32_t* crc_value,
int64_t* file_count)
std::sort(all_file_md5.begin(), all_file_md5.end());
// 3. calculate the crc_value based on all_file_md5
- DCHECK(local_paths.size() == all_file_md5.size());
- *crc_value = 0;
- *file_count = local_paths.size();
+ DCHECK(file_paths.size() == all_file_md5.size());
for (auto& i : all_file_md5) {
*crc_value = crc32c::Extend(*crc_value, i.data(), i.size());
}
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index bf7daf8bdfa..238073f066d 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -84,7 +84,7 @@ public:
[[nodiscard]] virtual Status add_to_binlog() override;
- Status calc_local_file_crc(uint32_t* crc_value, int64_t* file_count);
+ Status calc_file_crc(uint32_t* crc_value, int64_t* file_count);
protected:
BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr&
rowset_meta,
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index d8919fba417..1a1d3be6bc9 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -108,7 +108,6 @@
#include "service/point_query_executor.h"
#include "tablet.h"
#include "util/bvar_helper.h"
-#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
@@ -2652,35 +2651,4 @@ void Tablet::clear_cache() {
recycle_segment_cache(stale_rowset_map());
}
-Status Tablet::calc_local_file_crc(uint32_t* crc_value, int64_t start_version,
int64_t end_version,
- int32_t* rowset_count, int64_t* file_count)
{
- Version v(start_version, end_version);
- std::vector<RowsetSharedPtr> rowsets;
- traverse_rowsets([&rowsets, &v](const auto& rs) {
- // get local rowsets
- if (rs->is_local() && v.contains(rs->version())) {
- rowsets.emplace_back(rs);
- }
- });
- std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
- *rowset_count = rowsets.size();
-
- *crc_value = 0;
- *file_count = 0;
- for (const auto& rs : rowsets) {
- uint32_t rs_crc_value;
- int64_t rs_file_count = 0;
- auto rowset = std::static_pointer_cast<BetaRowset>(rs);
- auto st = rowset->calc_local_file_crc(&rs_crc_value, &rs_file_count);
- if (!st.ok()) {
- return st;
- }
- // crc_value is calculated based on the crc_value of each rowset.
- *crc_value = crc32c::Extend(*crc_value, reinterpret_cast<const
char*>(&rs_crc_value),
- sizeof(rs_crc_value));
- *file_count += rs_file_count;
- }
- return Status::OK();
-}
-
} // namespace doris
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 759e3e65614..fa11c2d8685 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -421,18 +421,6 @@ public:
int64_t start = -1);
bool should_skip_compaction(CompactionType compaction_type, int64_t now);
- void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
- bool include_stale = false) {
- std::shared_lock rlock(_meta_lock);
- for (auto& [v, rs] : _rs_version_map) {
- visitor(rs);
- }
- if (!include_stale) return;
- for (auto& [v, rs] : _stale_rs_version_map) {
- visitor(rs);
- }
- }
-
std::vector<std::string> get_binlog_filepath(std::string_view
binlog_version) const;
std::pair<std::string, int64_t> get_binlog_info(std::string_view
binlog_version) const;
std::string get_rowset_binlog_meta(std::string_view binlog_version,
@@ -483,8 +471,6 @@ public:
}
inline bool is_full_compaction_running() const { return
_is_full_compaction_running; }
void clear_cache() override;
- Status calc_local_file_crc(uint32_t* crc_value, int64_t start_version,
int64_t end_version,
- int32_t* rowset_count, int64_t* file_count);
private:
Status _init_once_action();
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 0be4dbff832..86862e4dbc9 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -405,6 +405,10 @@ void
HttpService::register_cloud_handler(CloudStorageEngine& engine) {
clear_file_cache_action);
auto* show_hotspot_action = _pool.add(new ShowHotspotAction(engine));
_ev_http_server->register_handler(HttpMethod::GET, "/api/hotspot/tablet",
show_hotspot_action);
+
+ CalcFileCrcAction* calc_crc_action = _pool.add(
+ new CalcFileCrcAction(_env, engine, TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN));
+ _ev_http_server->register_handler(HttpMethod::GET, "/api/calc_crc",
calc_crc_action);
}
// NOLINTEND(readability-function-size)
diff --git
a/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy
index d20e7079780..e238aa48b47 100644
---
a/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy
@@ -18,9 +18,6 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
suite("test_calc_crc", "nonConcurrent") {
- if (isCloudMode()) {
- return;
- }
def calc_file_crc_on_tablet = { ip, port, tablet ->
return curl("GET",
String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
}
@@ -79,12 +76,12 @@ suite("test_calc_crc", "nonConcurrent") {
assertEquals("12", parseJson(out_0.trim()).file_count)
try {
-
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::BetaRowset::calc_local_file_crc")
+
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::BetaRowset::calc_file_crc")
def (code_1, out_1, err_1) = calc_file_crc_on_tablet(ip, port,
tablet_id)
logger.info("Run calc_file_crc_on_tablet: code=" + code_1 + ", out=" +
out_1 + ", err=" + err_1)
- assertTrue(out_1.contains("fault_inject calc_local_file_crc error"))
+ assertTrue(out_1.contains("fault_inject calc_file_crc error"))
} finally {
-
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::BetaRowset::calc_local_file_crc")
+
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::BetaRowset::calc_file_crc")
}
def (code_2, out_2, err_2) = calc_file_crc_on_tablet_with_start(ip, port,
tablet_id, 0)
@@ -125,7 +122,7 @@ suite("test_calc_crc", "nonConcurrent") {
def (code_6, out_6, err_6) = calc_file_crc_on_tablet(ip, port, 123)
logger.info("Run calc_file_crc_on_tablet: code=" + code_6 + ", out=" +
out_6 + ", err=" + err_6)
- assertTrue(out_6.contains("Tablet not found."))
+ assertTrue(out_6.contains("failed to get tablet"))
sql "DROP TABLE IF EXISTS ${tableName}"
}
diff --git
a/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy
b/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy
index a4ae74bd80a..fbccf0f8a62 100644
---
a/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy
+++
b/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy
@@ -16,9 +16,6 @@
// under the License.
suite("test_local_schema_change_storge_format", "p0") {
- if (isCloudMode()) {
- return;
- }
def calc_file_crc_on_tablet = { ip, port, tablet ->
return curl("GET",
String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
}
diff --git
a/regression-test/suites/inverted_index_p0/test_variant_index_format_v1.groovy
b/regression-test/suites/inverted_index_p0/test_variant_index_format_v1.groovy
index 627ed987e3a..153e8b82f56 100644
---
a/regression-test/suites/inverted_index_p0/test_variant_index_format_v1.groovy
+++
b/regression-test/suites/inverted_index_p0/test_variant_index_format_v1.groovy
@@ -16,9 +16,6 @@
// under the License.
suite("test_variant_index_format_v1", "p0") {
- if (isCloudMode()) {
- return;
- }
def calc_file_crc_on_tablet = { ip, port, tablet ->
return curl("GET",
String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]