This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 51ccdfa4b57 branch-3.0: [enhancement](schema-change) Cloud schema
change do clean up when job failed #48426 (#48897)
51ccdfa4b57 is described below
commit 51ccdfa4b57eed0a6f72d6a9511028040ac0ecf3
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 11 17:36:00 2025 +0800
branch-3.0: [enhancement](schema-change) Cloud schema change do clean up
when job failed #48426 (#48897)
Cherry-picked from #48426
Co-authored-by: Siyang Tang <[email protected]>
---
be/src/agent/task_worker_pool.cpp | 80 +++++++++++++++-----------------
be/src/cloud/cloud_schema_change_job.cpp | 12 +++++
be/src/cloud/cloud_schema_change_job.h | 2 +
be/src/cloud/cloud_tablet.cpp | 28 ++++-------
be/src/olap/rowset/rowset.cpp | 9 ++++
5 files changed, 68 insertions(+), 63 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index da6184ad6fd..86ece5125ed 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -268,39 +268,46 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const
TAgentTaskRequest& age
// Do not need to adjust delete success or not
// Because if delete failed create rollup will failed
TTabletId new_tablet_id = 0;
- if (status.ok()) {
- new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
- auto mem_tracker = MemTrackerLimiter::create_shared(
- MemTrackerLimiter::Type::SCHEMA_CHANGE,
-
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
-
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
-
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
-
engine.memory_limitation_bytes_per_thread_for_schema_change()));
- SCOPED_ATTACH_TASK(mem_tracker);
- DorisMetrics::instance()->create_rollup_requests_total->increment(1);
- Status res = Status::OK();
- try {
- LOG_INFO("start {}", process_name)
- .tag("signature", agent_task_req.signature)
- .tag("base_tablet_id",
agent_task_req.alter_tablet_req_v2.base_tablet_id)
- .tag("new_tablet_id", new_tablet_id)
- .tag("mem_limit",
-
engine.memory_limitation_bytes_per_thread_for_schema_change());
- DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
- CloudSchemaChangeJob job(engine,
-
std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
-
agent_task_req.alter_tablet_req_v2.expiration);
- status =
job.process_alter_tablet(agent_task_req.alter_tablet_req_v2);
- } catch (const Exception& e) {
- status = e.to_status();
- }
- if (!status.ok()) {
-
DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
- }
- }
+ new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
+ auto mem_tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::SCHEMA_CHANGE,
+ fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
+
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
+
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
+
engine.memory_limitation_bytes_per_thread_for_schema_change()));
+ SCOPED_ATTACH_TASK(mem_tracker);
+ DorisMetrics::instance()->create_rollup_requests_total->increment(1);
+
+ LOG_INFO("start {}", process_name)
+ .tag("signature", agent_task_req.signature)
+ .tag("base_tablet_id",
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+ .tag("new_tablet_id", new_tablet_id)
+ .tag("mem_limit",
engine.memory_limitation_bytes_per_thread_for_schema_change());
+ DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
+ CloudSchemaChangeJob job(engine,
std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
+ agent_task_req.alter_tablet_req_v2.expiration);
+ status = [&]() {
+ HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
+ job.process_alter_tablet(agent_task_req.alter_tablet_req_v2),
+ [&](const doris::Exception& ex) {
+
DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
+ job.clean_up_on_failed();
+ });
+ return Status::OK();
+ }();
if (status.ok()) {
increase_report_version();
+ LOG_INFO("successfully {}", process_name)
+ .tag("signature", agent_task_req.signature)
+ .tag("base_tablet_id",
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+ .tag("new_tablet_id", new_tablet_id);
+ } else {
+ LOG_WARNING("failed to {}", process_name)
+ .tag("signature", agent_task_req.signature)
+ .tag("base_tablet_id",
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+ .tag("new_tablet_id", new_tablet_id)
+ .error(status);
}
// Return result to fe
@@ -308,19 +315,6 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const
TAgentTaskRequest& age
finish_task_request->__set_report_version(s_report_version);
finish_task_request->__set_task_type(task_type);
finish_task_request->__set_signature(signature);
-
- if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
- LOG_WARNING("failed to {}", process_name)
- .tag("signature", agent_task_req.signature)
- .tag("base_tablet_id",
agent_task_req.alter_tablet_req_v2.base_tablet_id)
- .tag("new_tablet_id", new_tablet_id)
- .error(status);
- } else {
- LOG_INFO("successfully {}", process_name)
- .tag("signature", agent_task_req.signature)
- .tag("base_tablet_id",
agent_task_req.alter_tablet_req_v2.base_tablet_id)
- .tag("new_tablet_id", new_tablet_id);
- }
finish_task_request->__set_task_status(status.to_thrift());
}
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index 1ec08acc9eb..b98dfb33a24 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -495,4 +495,16 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
return Status::OK();
}
+void CloudSchemaChangeJob::clean_up_on_failed() {
+ for (const auto& output_rs : _output_rowsets) {
+ if (output_rs.use_count() > 2) {
+ LOG(WARNING) << "Rowset " << output_rs->rowset_id().to_string() <<
" has "
+ << output_rs.use_count()
+ << " references. File Cache won't be recycled when
query is using it.";
+ return;
+ }
+ output_rs->clear_cache();
+ }
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_schema_change_job.h
b/be/src/cloud/cloud_schema_change_job.h
index c77aae48570..dee71cd3104 100644
--- a/be/src/cloud/cloud_schema_change_job.h
+++ b/be/src/cloud/cloud_schema_change_job.h
@@ -36,6 +36,8 @@ public:
// This method is idempotent for a same request.
Status process_alter_tablet(const TAlterTabletReqV2& request);
+ void clean_up_on_failed();
+
private:
Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
cloud::TabletJobInfoPB& job);
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index bda4e8ea1a3..82a5927c1c4 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -33,6 +33,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
+#include "common/config.h"
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
@@ -452,27 +453,14 @@ void CloudTablet::clear_cache() {
}
void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>&
rowsets) {
- for (auto& rs : rowsets) {
- // Clear cached opened segments and inverted index cache in memory
- rs->clear_cache();
- }
-
- if (config::enable_file_cache) {
- for (const auto& rs : rowsets) {
- // rowsets and tablet._rs_version_map each hold a rowset
shared_ptr, so at this point, the reference count of the shared_ptr is at least
2.
- if (rs.use_count() > 2) {
- LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << "
has "
- << rs.use_count()
- << " references. File Cache won't be recycled
when query is using it.";
- continue;
- }
- for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
- // TODO: Segment::file_cache_key
- auto file_key =
Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
- auto* file_cache =
io::FileCacheFactory::instance()->get_by_path(file_key);
- file_cache->remove_if_cached_async(file_key);
- }
+ for (const auto& rs : rowsets) {
+ // rowsets and tablet._rs_version_map each hold a rowset shared_ptr,
so at this point, the reference count of the shared_ptr is at least 2.
+ if (rs.use_count() > 2) {
+ LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has
" << rs.use_count()
+ << " references. File Cache won't be recycled when
query is using it.";
+ return;
}
+ rs->clear_cache();
}
}
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index ac3a2a7a1dc..3b86504090d 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -19,6 +19,8 @@
#include <gen_cpp/olap_file.pb.h>
+#include "common/config.h"
+#include "io/cache/block_file_cache_factory.h"
#include "olap/olap_define.h"
#include "olap/segment_loader.h"
#include "olap/tablet_schema.h"
@@ -120,6 +122,13 @@ void Rowset::clear_cache() {
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1));
clear_inverted_index_cache();
}
+ if (config::enable_file_cache) {
+ for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
+ auto file_key =
segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id);
+ auto* file_cache =
io::FileCacheFactory::instance()->get_by_path(file_key);
+ file_cache->remove_if_cached_async(file_key);
+ }
+ }
}
Result<std::string> Rowset::segment_path(int64_t seg_id) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]