This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f278793173e [fix](cloud) retry read_at when corruption using file
cache (#48786)
f278793173e is described below
commit f278793173e482fd20ef32c3a9887c473774d097
Author: zhengyu <[email protected]>
AuthorDate: Fri Mar 28 19:17:19 2025 +0800
[fix](cloud) retry read_at when corruption using file cache (#48786)
---
be/src/cloud/injection_point_action.cpp | 22 +++++++
be/src/olap/rowset/segment_v2/column_reader.cpp | 23 ++++----
.../rowset/segment_v2/indexed_column_reader.cpp | 25 ++++----
.../olap/rowset/segment_v2/ordinal_page_index.cpp | 22 ++++---
be/src/olap/rowset/segment_v2/page_io.cpp | 67 ++++++++++++++++++++++
be/src/olap/rowset/segment_v2/page_io.h | 34 +++++++++--
be/src/olap/rowset/segment_v2/segment.cpp | 30 ++++------
7 files changed, 162 insertions(+), 61 deletions(-)
diff --git a/be/src/cloud/injection_point_action.cpp
b/be/src/cloud/injection_point_action.cpp
index bc6676313c1..4fce2609d8e 100644
--- a/be/src/cloud/injection_point_action.cpp
+++ b/be/src/cloud/injection_point_action.cpp
@@ -27,7 +27,9 @@
#include "http/http_channel.h"
#include "http/http_request.h"
#include "http/http_status.h"
+#include "io/cache/cached_remote_file_reader.h"
#include "olap/rowset/rowset.h"
+#include "olap/rowset/segment_v2/page_io.h"
#include "util/stack_util.h"
namespace doris {
@@ -133,6 +135,26 @@ void register_suites() {
*arg0 =
Status::Corruption<false>("test_file_segment_cache_corruption injection error");
});
});
+ // curl
"be_ip:http_port/api/injection_point/apply_suite/PageIO::read_and_decompress_page:crc_failure"
+ suite_map.emplace("PageIO::read_and_decompress_page:crc_failure", [] {
+ auto* sp = SyncPoint::get_instance();
+ sp->set_call_back("PageIO::read_and_decompress_page:crc_failure_inj",
[](auto&& args) {
+ LOG(INFO) << "PageIO::read_and_decompress_page:crc_failure_inj";
+ if (auto ctx =
std::any_cast<segment_v2::InjectionContext*>(args[0])) {
+ uint32_t* crc = ctx->crc;
+ segment_v2::PageReadOptions* opts = ctx->opts;
+ auto cached_file_reader =
+
dynamic_cast<io::CachedRemoteFileReader*>(opts->file_reader);
+ if (cached_file_reader == nullptr) {
+ return; // if not cachedreader, then do nothing
+ } else {
+ memset(crc, 0, 32);
+ }
+ } else {
+ std::cerr << "Failed to cast std::any to InjectionContext*" <<
std::endl;
+ }
+ });
+ });
// curl
be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn'
suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] {
auto* sp = SyncPoint::get_instance();
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index bede84b4bee..431ef97dfbf 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -356,18 +356,17 @@ Status ColumnReader::read_page(const
ColumnIteratorOptions& iter_opts, const Pag
PageHandle* handle, Slice* page_body,
PageFooterPB* footer,
BlockCompressionCodec* codec) const {
iter_opts.sanity_check();
- PageReadOptions opts {
- .verify_checksum = _opts.verify_checksum,
- .use_page_cache = iter_opts.use_page_cache,
- .kept_in_memory = _opts.kept_in_memory,
- .type = iter_opts.type,
- .file_reader = iter_opts.file_reader,
- .page_pointer = pp,
- .codec = codec,
- .stats = iter_opts.stats,
- .encoding_info = _encoding_info,
- .io_ctx = iter_opts.io_ctx,
- };
+ PageReadOptions opts(iter_opts.io_ctx);
+ opts.verify_checksum = _opts.verify_checksum;
+ opts.use_page_cache = iter_opts.use_page_cache;
+ opts.kept_in_memory = _opts.kept_in_memory;
+ opts.type = iter_opts.type;
+ opts.file_reader = iter_opts.file_reader;
+ opts.page_pointer = pp;
+ opts.codec = codec;
+ opts.stats = iter_opts.stats;
+ opts.encoding_info = _encoding_info;
+
// index page should not pre decode
if (iter_opts.type == INDEX_PAGE) opts.pre_decode = false;
return PageIO::read_and_decompress_page(opts, handle, page_body, footer);
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
index 3f582293ee4..154c5073cfc 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
@@ -123,19 +123,18 @@ Status IndexedColumnReader::read_page(const PagePointer&
pp, PageHandle* handle,
OlapReaderStatistics* stats) const {
OlapReaderStatistics tmp_stats;
OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
- PageReadOptions opts {
- .use_page_cache = _use_page_cache,
- .kept_in_memory = _kept_in_memory,
- .pre_decode = pre_decode,
- .type = type,
- .file_reader = _file_reader.get(),
- .page_pointer = pp,
- .codec = codec,
- .stats = stats_ptr,
- .encoding_info = _encoding_info,
- .io_ctx = io::IOContext {.is_index_data = true,
- .file_cache_stats =
&stats_ptr->file_cache_stats},
- };
+ PageReadOptions opts(io::IOContext {.is_index_data = true,
+ .file_cache_stats =
&stats_ptr->file_cache_stats});
+ opts.use_page_cache = _use_page_cache;
+ opts.kept_in_memory = _kept_in_memory;
+ opts.pre_decode = pre_decode;
+ opts.type = type;
+ opts.file_reader = _file_reader.get();
+ opts.page_pointer = pp;
+ opts.codec = codec;
+ opts.stats = stats_ptr;
+ opts.encoding_info = _encoding_info;
+
if (_is_pk_index) {
opts.type = PRIMARY_KEY_INDEX_PAGE;
}
diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
index 4995e779892..8faab35cebb 100644
--- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
+++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
@@ -91,18 +91,16 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool
kept_in_memory,
// need to read index page
OlapReaderStatistics tmp_stats;
OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
- PageReadOptions opts {
- .use_page_cache = use_page_cache,
- .kept_in_memory = kept_in_memory,
- .type = INDEX_PAGE,
- .file_reader = _file_reader.get(),
- .page_pointer = PagePointer(index_meta->root_page().root_page()),
- // ordinal index page uses NO_COMPRESSION right now
- .codec = nullptr,
- .stats = stats_ptr,
- .io_ctx = io::IOContext {.is_index_data = true,
- .file_cache_stats =
&stats_ptr->file_cache_stats},
- };
+ PageReadOptions opts(io::IOContext {.is_index_data = true,
+ .file_cache_stats =
&stats_ptr->file_cache_stats});
+ opts.use_page_cache = use_page_cache;
+ opts.kept_in_memory = kept_in_memory;
+ opts.type = INDEX_PAGE;
+ opts.file_reader = _file_reader.get();
+ opts.page_pointer = PagePointer(index_meta->root_page().root_page());
+ // ordinal index page uses NO_COMPRESSION right now
+ opts.codec = nullptr;
+ opts.stats = stats_ptr;
// read index page
PageHandle page_handle;
diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp
b/be/src/olap/rowset/segment_v2/page_io.cpp
index d014f6627f5..343c395c875 100644
--- a/be/src/olap/rowset/segment_v2/page_io.cpp
+++ b/be/src/olap/rowset/segment_v2/page_io.cpp
@@ -27,8 +27,13 @@
#include <string>
#include <utility>
+#include "cloud/config.h"
#include "common/logging.h"
+#include "cpp/sync_point.h"
#include "gutil/strings/substitute.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/cached_remote_file_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
#include "olap/olap_common.h"
@@ -111,6 +116,15 @@ Status PageIO::write_page(io::FileWriter* writer, const
std::vector<Slice>& body
return Status::OK();
}
+io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
+ std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky:
npos + 1 == 0
+ return io::BlockFileCache::hash(base);
+}
+
+std::string file_cache_key_str(const std::string& seg_path) {
+ return file_cache_key_from_path(seg_path).to_string();
+}
+
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts,
PageHandle* handle,
Slice* body, PageFooterPB* footer) {
opts.sanity_check();
@@ -161,6 +175,9 @@ Status PageIO::read_and_decompress_page_(const
PageReadOptions& opts, PageHandle
if (opts.verify_checksum) {
uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data +
page_slice.size - 4);
uint32_t actual = crc32c::Value(page_slice.data, page_slice.size - 4);
+ InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)};
+ (void)ctx;
+
TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj",
&ctx);
if (expect != actual) {
return Status::Corruption(
"Bad page: checksum mismatch (actual={} vs expect={}),
file={}", actual, expect,
@@ -231,5 +248,55 @@ Status PageIO::read_and_decompress_page_(const
PageReadOptions& opts, PageHandle
return Status::OK();
}
+Status PageIO::read_and_decompress_page(const PageReadOptions& opts,
PageHandle* handle,
+ Slice* body, PageFooterPB* footer) {
+ // First try to read with file cache
+ Status st = do_read_and_decompress_page(opts, handle, body, footer);
+ if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) {
+ return st;
+ }
+
+ auto* cached_file_reader =
dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader);
+ if (cached_file_reader == nullptr) {
+ return st;
+ }
+
+ // If we get CORRUPTION error and using file cache, clear cache and retry
+ LOG(WARNING) << "Bad page may be read from file cache, need retry."
+ << " error msg: " << st.msg()
+ << " file path: " << opts.file_reader->path().native()
+ << " offset: " << opts.page_pointer.offset;
+
+ // Remove cache if exists
+ const std::string path = opts.file_reader->path().string();
+ auto file_key = file_cache_key_from_path(path);
+ auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
+ if (file_cache) {
+ file_cache->remove_if_cached(file_key);
+ }
+
+ // Retry with file cache
+ st = do_read_and_decompress_page(opts, handle, body, footer);
+ if (!st.is<ErrorCode::CORRUPTION>()) {
+ return st;
+ }
+
+ LOG(WARNING) << "Corruption again with retry downloading cache,"
+ << " error msg: " << st.msg()
+ << " file path: " << opts.file_reader->path().native()
+ << " offset: " << opts.page_pointer.offset;
+
+ PageReadOptions new_opts = opts;
+ new_opts.file_reader = cached_file_reader->get_remote_reader();
+ st = do_read_and_decompress_page(new_opts, handle, body, footer);
+ if (!st.ok()) {
+ LOG(WARNING) << "Corruption again with retry read directly from
remote,"
+ << " error msg: " << st.msg()
+ << " file path: " << opts.file_reader->path().native()
+ << " offset: " << opts.page_pointer.offset << " Give up.";
+ }
+ return st;
+}
+
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/page_io.h
b/be/src/olap/rowset/segment_v2/page_io.h
index b23af4b0b35..b3c9a1ca798 100644
--- a/be/src/olap/rowset/segment_v2/page_io.h
+++ b/be/src/olap/rowset/segment_v2/page_io.h
@@ -23,6 +23,7 @@
#include "common/logging.h"
#include "common/status.h"
+#include "io/cache/block_file_cache.h"
#include "io/io_common.h"
#include "olap/rowset/segment_v2/page_pointer.h"
#include "util/slice.h"
@@ -41,6 +42,9 @@ namespace segment_v2 {
class EncodingInfo;
class PageHandle;
+io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path);
+std::string file_cache_key_str(const std::string& seg_path);
+
struct PageReadOptions {
// whether to verify page checksum
bool verify_checksum = true;
@@ -66,12 +70,31 @@ struct PageReadOptions {
const EncodingInfo* encoding_info = nullptr;
- const io::IOContext& io_ctx;
+ const io::IOContext io_ctx;
void sanity_check() const {
CHECK_NOTNULL(file_reader);
CHECK_NOTNULL(stats);
}
+ PageReadOptions(const io::IOContext& ioctx) : io_ctx(ioctx) {}
+
+ PageReadOptions(const PageReadOptions& old) : io_ctx(old.io_ctx) {
+ file_reader = old.file_reader;
+ page_pointer = old.page_pointer;
+ codec = old.codec;
+ stats = old.stats;
+ verify_checksum = old.verify_checksum;
+ use_page_cache = old.use_page_cache;
+ kept_in_memory = old.kept_in_memory;
+ type = old.type;
+ encoding_info = old.encoding_info;
+ pre_decode = old.pre_decode;
+ }
+};
+
+struct InjectionContext {
+ uint32_t* crc;
+ PageReadOptions* opts;
};
inline std::ostream& operator<<(std::ostream& os, const PageReadOptions& opt) {
@@ -124,13 +147,16 @@ public:
// `body' points to page body,
// `footer' stores the page footer.
// This method is exception safe, it will failed when allocate memory
failed.
+ // deal with CORRUPTION when using file cache, retry from remote
static Status read_and_decompress_page(const PageReadOptions& opts,
PageHandle* handle,
- Slice* body, PageFooterPB* footer) {
+ Slice* body, PageFooterPB* footer);
+
+private:
+ static Status do_read_and_decompress_page(const PageReadOptions& opts,
PageHandle* handle,
+ Slice* body, PageFooterPB*
footer) {
RETURN_IF_CATCH_EXCEPTION(
{ return read_and_decompress_page_(opts, handle, body,
footer); });
}
-
-private:
// An internal method that not deal with exception.
static Status read_and_decompress_page_(const PageReadOptions& opts,
PageHandle* handle,
Slice* body, PageFooterPB* footer);
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 96a3a74e656..90135b5826d 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -78,15 +78,6 @@ namespace doris::segment_v2 {
class InvertedIndexIterator;
-io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
- std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky:
npos + 1 == 0
- return io::BlockFileCache::hash(base);
-}
-
-std::string file_cache_key_str(const std::string& seg_path) {
- return file_cache_key_from_path(seg_path).to_string();
-}
-
Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t
tablet_id,
uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr
tablet_schema,
const io::FileReaderOptions& reader_options,
std::shared_ptr<Segment>* output,
@@ -527,17 +518,16 @@ Status Segment::load_index(OlapReaderStatistics* stats) {
// read and parse short key index page
OlapReaderStatistics tmp_stats;
OlapReaderStatistics* stats_ptr = stats != nullptr ? stats :
&tmp_stats;
- PageReadOptions opts {
- .use_page_cache = true,
- .type = INDEX_PAGE,
- .file_reader = _file_reader.get(),
- .page_pointer = PagePointer(_sk_index_page),
- // short key index page uses NO_COMPRESSION for now
- .codec = nullptr,
- .stats = &tmp_stats,
- .io_ctx = io::IOContext {.is_index_data = true,
- .file_cache_stats =
&stats_ptr->file_cache_stats},
- };
+ PageReadOptions opts(io::IOContext {.is_index_data = true,
+ .file_cache_stats =
&stats_ptr->file_cache_stats});
+ opts.use_page_cache = true;
+ opts.type = INDEX_PAGE;
+ opts.file_reader = _file_reader.get();
+ opts.page_pointer = PagePointer(_sk_index_page);
+ // short key index page uses NO_COMPRESSION for now
+ opts.codec = nullptr;
+ opts.stats = &tmp_stats;
+
Slice body;
PageFooterPB footer;
RETURN_IF_ERROR(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]