This is an automated email from the ASF dual-hosted git repository.
airborne pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6841a547acd [fix](inverted index) resolve io_ctx heap-use-after-free
in concurrent reader access (#47634)
6841a547acd is described below
commit 6841a547acd4485c29c8c9cd7203c21e5b546854
Author: zzzxl <[email protected]>
AuthorDate: Mon Feb 10 10:25:23 2025 +0800
[fix](inverted index) resolve io_ctx heap-use-after-free in concurrent
reader access (#47634)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
When concurrently opening readers for the inverted index, it's necessary
to clone the stream before setting the io_ctx. This ensures that each
query uses its own io_ctx. Without cloning, multiple queries might share
the same stream, leading to heap-use-after-free issues when accessing
the io_ctx.
---
.../segment_v2/inverted_index_compound_reader.cpp | 31 +++++++++++++++++++++-
.../segment_v2/inverted_index_compound_reader.h | 26 +++++++-----------
.../segment_v2/inverted_index_file_reader.cpp | 22 +++++----------
.../rowset/segment_v2/inverted_index_file_reader.h | 6 +++--
.../rowset/segment_v2/inverted_index_reader.cpp | 22 +++++++++++++--
.../test_index_io_context.groovy | 4 ++-
6 files changed, 72 insertions(+), 39 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index 7a993daacf1..86efe86ca43 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -152,11 +152,35 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx = static_cast<const io::IOContext*>(io_ctx);
}
-DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
int32_t read_buffer_size)
+DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
+ EntriesType* entries_clone, int32_t
read_buffer_size,
+ const io::IOContext* io_ctx)
+ : _stream(stream),
+ _entries(_CLNEW EntriesType(true, true)),
+ _read_buffer_size(read_buffer_size) {
+ // After stream clone, the io_ctx needs to be reconfigured.
+ initialize(io_ctx);
+
+ for (auto& e : *entries_clone) {
+ auto* origin_entry = e.second;
+ auto* entry = _CLNEW ReaderFileEntry();
+ char* aid = strdup(e.first);
+ entry->file_name = origin_entry->file_name;
+ entry->offset = origin_entry->offset;
+ entry->length = origin_entry->length;
+ _entries->put(aid, entry);
+ }
+};
+
+DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
int32_t read_buffer_size,
+ const io::IOContext* io_ctx)
: _ram_dir(new lucene::store::RAMDirectory()),
_stream(stream),
_entries(_CLNEW EntriesType(true, true)),
_read_buffer_size(read_buffer_size) {
+ // After stream clone, the io_ctx needs to be reconfigured.
+ initialize(io_ctx);
+
try {
int32_t count = _stream->readVInt();
ReaderFileEntry* entry = nullptr;
@@ -383,5 +407,10 @@ CL_NS(store)::IndexInput*
DorisCompoundReader::getDorisIndexInput() {
return _stream;
}
+void DorisCompoundReader::initialize(const io::IOContext* io_ctx) {
+ _stream->setIoContext(io_ctx);
+ _stream->setIdxFileCache(true);
+}
+
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index 1c7bc159b9c..4a687e4ed3e 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -34,6 +34,7 @@
#include <vector>
#include "io/fs/file_system.h"
+#include "io/io_common.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
class CLuceneError;
@@ -78,24 +79,12 @@ protected:
bool doDeleteFile(const char* name) override;
public:
- explicit DorisCompoundReader(
- CL_NS(store)::IndexInput* stream, EntriesType* entries_clone,
- int32_t read_buffer_size =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE)
- : _stream(stream),
- _entries(_CLNEW EntriesType(true, true)),
- _read_buffer_size(read_buffer_size) {
- for (auto& e : *entries_clone) {
- auto* origin_entry = e.second;
- auto* entry = _CLNEW ReaderFileEntry();
- char* aid = strdup(e.first);
- entry->file_name = origin_entry->file_name;
- entry->offset = origin_entry->offset;
- entry->length = origin_entry->length;
- _entries->put(aid, entry);
- }
- };
+ DorisCompoundReader(CL_NS(store)::IndexInput* stream, EntriesType*
entries_clone,
+ int32_t read_buffer_size =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
+ const io::IOContext* io_ctx = nullptr);
DorisCompoundReader(CL_NS(store)::IndexInput* stream,
- int32_t read_buffer_size =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
+ int32_t read_buffer_size =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
+ const io::IOContext* io_ctx = nullptr);
~DorisCompoundReader() override;
void copyFile(const char* file, int64_t file_length, uint8_t* buffer,
int64_t buffer_length);
bool list(std::vector<std::string>* names) const override;
@@ -115,6 +104,9 @@ public:
static const char* getClassName();
const char* getObjectName() const override;
CL_NS(store)::IndexInput* getDorisIndexInput();
+
+private:
+ void initialize(const io::IOContext* io_ctx);
};
} // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
index e7838f1ffd0..30c3e178732 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
@@ -28,22 +28,13 @@
namespace doris::segment_v2 {
Status InvertedIndexFileReader::init(int32_t read_buffer_size, const
io::IOContext* io_ctx) {
+ std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
if (!_inited) {
_read_buffer_size = read_buffer_size;
if (_storage_format >= InvertedIndexStorageFormatPB::V2) {
- auto st = _init_from(read_buffer_size, io_ctx);
- if (!st.ok()) {
- return st;
- }
+ RETURN_IF_ERROR(_init_from(read_buffer_size, io_ctx));
}
_inited = true;
- } else {
- if (_storage_format == InvertedIndexStorageFormatPB::V2) {
- if (_stream) {
- _stream->setIoContext(io_ctx);
- _stream->setIndexFile(true);
- }
- }
}
return Status::OK();
}
@@ -51,7 +42,6 @@ Status InvertedIndexFileReader::init(int32_t
read_buffer_size, const io::IOConte
Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const
io::IOContext* io_ctx) {
auto index_file_full_path =
InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
- std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
try {
CLuceneError err;
CL_NS(store)::IndexInput* index_input = nullptr;
@@ -161,7 +151,7 @@ Result<InvertedIndexDirectoryMap>
InvertedIndexFileReader::get_all_directories()
}
Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::_open(
- int64_t index_id, const std::string& index_suffix) const {
+ int64_t index_id, const std::string& index_suffix, const
io::IOContext* io_ctx) const {
std::unique_ptr<DorisCompoundReader> compound_reader;
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
@@ -231,16 +221,16 @@ Result<std::unique_ptr<DorisCompoundReader>>
InvertedIndexFileReader::_open(
}
// Need to clone resource here, because index searcher cache need it.
compound_reader = std::make_unique<DorisCompoundReader>(
- _stream->clone(), index_it->second.get(), _read_buffer_size);
+ _stream->clone(), index_it->second.get(), _read_buffer_size,
io_ctx);
}
return compound_reader;
}
Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::open(
- const TabletIndex* index_meta) const {
+ const TabletIndex* index_meta, const io::IOContext* io_ctx) const {
auto index_id = index_meta->index_id();
auto index_suffix = index_meta->get_index_suffix();
- return _open(index_id, index_suffix);
+ return _open(index_id, index_suffix, io_ctx);
}
std::string InvertedIndexFileReader::get_index_file_cache_key(const
TabletIndex* index_meta) const {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
index 5f3775649d3..63dd89cf975 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
@@ -60,7 +60,8 @@ public:
Status init(int32_t read_buffer_size =
config::inverted_index_read_buffer_size,
const io::IOContext* io_ctx = nullptr);
- Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex*
index_meta) const;
+ Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex*
index_meta,
+ const io::IOContext*
io_ctx = nullptr) const;
void debug_file_entries();
std::string get_index_file_cache_key(const TabletIndex* index_meta) const;
std::string get_index_file_path(const TabletIndex* index_meta) const;
@@ -74,7 +75,8 @@ public:
protected:
Status _init_from(int32_t read_buffer_size, const io::IOContext* io_ctx);
Result<std::unique_ptr<DorisCompoundReader>> _open(int64_t index_id,
- const std::string&
index_suffix) const;
+ const std::string&
index_suffix,
+ const io::IOContext*
io_ctx = nullptr) const;
private:
IndicesEntriesMap _indices_entries;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index c1e3b10d882..5da74fd1dcf 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -127,7 +127,7 @@ Status InvertedIndexReader::read_null_bitmap(const
io::IOContext* io_ctx,
LOG(WARNING) << st;
return st;
}
- auto directory =
DORIS_TRY(_inverted_index_file_reader->open(&_index_meta));
+ auto directory =
DORIS_TRY(_inverted_index_file_reader->open(&_index_meta, io_ctx));
dir = directory.release();
owned_dir = true;
}
@@ -218,7 +218,25 @@ Status InvertedIndexReader::handle_searcher_cache(
LOG(WARNING) << st;
return st;
}
- auto dir = DORIS_TRY(_inverted_index_file_reader->open(&_index_meta));
+ auto dir = DORIS_TRY(_inverted_index_file_reader->open(&_index_meta,
io_ctx));
+
+ DBUG_EXECUTE_IF("InvertedIndexReader.handle_searcher_cache.io_ctx", ({
+ if (dir) {
+ auto* stream = dir->getDorisIndexInput();
+ const auto* cur_io_ctx =
+ (const
io::IOContext*)stream->getIoContext();
+ if (cur_io_ctx->file_cache_stats) {
+ if (cur_io_ctx->file_cache_stats !=
&stats->file_cache_stats) {
+ LOG(FATAL) << "io context file cache
stats is not equal to "
+ "stats file cache "
+ "stats: "
+ <<
cur_io_ctx->file_cache_stats << ", "
+ << &stats->file_cache_stats;
+ }
+ }
+ }
+ }));
+
// try to reuse index_searcher's directory to read null_bitmap to cache
// to avoid open directory additionally for null_bitmap
// TODO: handle null bitmap procedure in new format.
diff --git
a/regression-test/suites/fault_injection_p0/test_index_io_context.groovy
b/regression-test/suites/fault_injection_p0/test_index_io_context.groovy
index 8dd82ccb304..23096432a3a 100644
--- a/regression-test/suites/fault_injection_p0/test_index_io_context.groovy
+++ b/regression-test/suites/fault_injection_p0/test_index_io_context.groovy
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_index_io_context", "p0") {
+suite("test_index_io_context", "nonConcurrent") {
def tableName1 = "test_index_io_context1"
def tableName2 = "test_index_io_context2"
@@ -79,6 +79,7 @@ suite("test_index_io_context", "p0") {
sql """ set enable_common_expr_pushdown = true; """
try {
+
GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexReader.handle_searcher_cache.io_ctx")
qt_sql """ select count() from ${tableName1} where request
match_any 'ticket_quest_bg2.jpg'; """
qt_sql """ select count() from ${tableName1} where request
match_any 'ticket_quest_bg2.jpg'; """
qt_sql """ select count() from ${tableName1} where request
match_any 'ticket_quest_bg2.jpg'; """
@@ -104,6 +105,7 @@ suite("test_index_io_context", "p0") {
qt_sql """ select count() from ${tableName2} where request
match_phrase 'ticket_quest_bg2.jpg ~10+'; """
qt_sql """ select count() from ${tableName2} where request
match_phrase 'ticket_quest_bg2.jpg ~10+'; """
} finally {
+
GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexReader.handle_searcher_cache.io_ctx")
}
} finally {
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]