This is an automated email from the ASF dual-hosted git repository.
airborne pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fdbd9caf977 branch-3.0: [opt](inverted index) add performance
profiling for remote io access in inverted index #43542 (#44093)
fdbd9caf977 is described below
commit fdbd9caf9775e166ede720246dae10c9a38c6d66
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 18 11:59:23 2024 +0800
branch-3.0: [opt](inverted index) add performance profiling for remote io
access in inverted index #43542 (#44093)
Cherry-picked from #43542
Co-authored-by: zzzxl <[email protected]>
---
be/src/clucene | 2 +-
be/src/index-tools/index_tool.cpp | 2 +-
be/src/olap/compaction.cpp | 9 +-
be/src/olap/rowset/segment_v2/column_reader.cpp | 2 +-
.../inverted_index/query/conjunction_query.cpp | 5 +-
.../inverted_index/query/conjunction_query.h | 3 +-
.../inverted_index/query/disjunction_query.cpp | 6 +-
.../inverted_index/query/disjunction_query.h | 3 +-
.../inverted_index/query/phrase_edge_query.cpp | 2 +-
.../inverted_index/query/phrase_edge_query.h | 2 +-
.../inverted_index/query/phrase_prefix_query.cpp | 3 +-
.../inverted_index/query/phrase_prefix_query.h | 2 +-
.../inverted_index/query/phrase_query.cpp | 8 +-
.../segment_v2/inverted_index/query/phrase_query.h | 3 +-
.../rowset/segment_v2/inverted_index/query/query.h | 1 +
.../inverted_index/query/regexp_query.cpp | 4 +-
.../segment_v2/inverted_index/query/regexp_query.h | 2 +-
.../segment_v2/inverted_index_compound_reader.cpp | 15 +++
.../segment_v2/inverted_index_file_reader.cpp | 5 +-
.../rowset/segment_v2/inverted_index_file_reader.h | 4 +-
.../segment_v2/inverted_index_fs_directory.cpp | 37 ++++++-
.../segment_v2/inverted_index_fs_directory.h | 7 +-
.../rowset/segment_v2/inverted_index_reader.cpp | 94 +++++++++--------
.../olap/rowset/segment_v2/inverted_index_reader.h | 83 ++++++++-------
.../compaction/index_compaction_test.cpp | 4 +-
.../index_compaction_with_deleted_term.cpp | 4 +-
.../fault_injection_p0/test_index_io_context.out | 73 +++++++++++++
.../test_index_io_context.groovy | 113 +++++++++++++++++++++
28 files changed, 373 insertions(+), 125 deletions(-)
diff --git a/be/src/clucene b/be/src/clucene
index 7cf6cf410d4..48fa9cc4ec3 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit 7cf6cf410d41d95456edba263cc55b7b6f5ab027
+Subproject commit 48fa9cc4ec32b40bf3b02338d0a1b2cdbc6408cf
diff --git a/be/src/index-tools/index_tool.cpp
b/be/src/index-tools/index_tool.cpp
index adea2cd84c9..ca0575dc545 100644
--- a/be/src/index-tools/index_tool.cpp
+++ b/be/src/index-tools/index_tool.cpp
@@ -170,7 +170,7 @@ void search(lucene::store::Directory* dir, std::string&
field, std::string& toke
std::vector<std::string> terms = split(token, '|');
doris::TQueryOptions queryOptions;
- ConjunctionQuery conjunct_query(s, queryOptions);
+ ConjunctionQuery conjunct_query(s, queryOptions, nullptr);
conjunct_query.add(field_ws, terms);
conjunct_query.search(result);
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index abbd84001c8..e608033701e 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -607,11 +607,9 @@ Status Compaction::do_inverted_index_compaction() {
fs, std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(seg_id));
- bool open_idx_file_cache = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
-
inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
- open_idx_file_cache),
- "inverted_index_file_reader init failed");
+
inverted_index_file_reader->init(config::inverted_index_read_buffer_size),
+ "inverted_index_file_reader init faiqled");
inverted_index_file_readers[m.second] =
std::move(inverted_index_file_reader);
}
@@ -779,9 +777,8 @@ void
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path)},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(i));
- bool open_idx_file_cache = false;
auto st = inverted_index_file_reader->init(
- config::inverted_index_read_buffer_size,
open_idx_file_cache);
+ config::inverted_index_read_buffer_size);
index_file_path =
inverted_index_file_reader->get_index_file_path(index_meta);
DBUG_EXECUTE_IF(
"Compaction::construct_skip_inverted_index_index_file_reader_init_"
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index e2e6e93f602..66e24fc105e 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -336,7 +336,7 @@ Status ColumnReader::new_inverted_index_iterator(
{
std::shared_lock<std::shared_mutex> rlock(_load_index_lock);
if (_inverted_index) {
- RETURN_IF_ERROR(_inverted_index->new_iterator(read_options.stats,
+ RETURN_IF_ERROR(_inverted_index->new_iterator(read_options.io_ctx,
read_options.stats,
read_options.runtime_state, iterator));
}
}
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.cpp
b/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.cpp
index fb247951716..6e9d61db7fd 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.cpp
@@ -20,8 +20,9 @@
namespace doris::segment_v2 {
ConjunctionQuery::ConjunctionQuery(const
std::shared_ptr<lucene::search::IndexSearcher>& searcher,
- const TQueryOptions& query_options)
+ const TQueryOptions& query_options, const
io::IOContext* io_ctx)
: _searcher(searcher),
+ _io_ctx(io_ctx),
_index_version(_searcher->getReader()->getIndexVersion()),
_conjunction_ratio(query_options.inverted_index_conjunction_opt_threshold) {}
@@ -48,7 +49,7 @@ void ConjunctionQuery::add(const std::wstring& field_name,
const std::vector<std
std::wstring ws_term = StringUtil::string_to_wstring(term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
- TermDocs* term_doc = _searcher->getReader()->termDocs(t);
+ TermDocs* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
_term_docs.push_back(term_doc);
iterators.emplace_back(term_doc);
}
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.h
b/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.h
index 2571392d529..b9bfee2bfb1 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.h
@@ -27,7 +27,7 @@ namespace doris::segment_v2 {
class ConjunctionQuery : public Query {
public:
ConjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>&
searcher,
- const TQueryOptions& query_options);
+ const TQueryOptions& query_options, const io::IOContext*
io_ctx);
~ConjunctionQuery() override;
void add(const std::wstring& field_name, const std::vector<std::string>&
terms) override;
@@ -41,6 +41,7 @@ private:
public:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
+ const io::IOContext* _io_ctx = nullptr;
IndexVersion _index_version = IndexVersion::kV0;
int32_t _conjunction_ratio = 1000;
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.cpp
b/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.cpp
index 650a88c0646..852357073d3 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.cpp
@@ -20,8 +20,8 @@
namespace doris::segment_v2 {
DisjunctionQuery::DisjunctionQuery(const
std::shared_ptr<lucene::search::IndexSearcher>& searcher,
- const TQueryOptions& query_options)
- : _searcher(searcher) {}
+ const TQueryOptions& query_options, const
io::IOContext* io_ctx)
+ : _searcher(searcher), _io_ctx(io_ctx) {}
void DisjunctionQuery::add(const std::wstring& field_name, const
std::vector<std::string>& terms) {
if (terms.empty()) {
@@ -36,7 +36,7 @@ void DisjunctionQuery::search(roaring::Roaring& roaring) {
auto func = [this, &roaring](const std::string& term, bool first) {
std::wstring ws_term = StringUtil::string_to_wstring(term);
auto* t = _CLNEW Term(_field_name.c_str(), ws_term.c_str());
- auto* term_doc = _searcher->getReader()->termDocs(t);
+ auto* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
TermIterator iterator(term_doc);
DocRange doc_range;
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.h
b/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.h
index 35783146157..8d0559ee4b0 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.h
@@ -27,7 +27,7 @@ namespace doris::segment_v2 {
class DisjunctionQuery : public Query {
public:
DisjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>&
searcher,
- const TQueryOptions& query_options);
+ const TQueryOptions& query_options, const io::IOContext*
io_ctx);
~DisjunctionQuery() override = default;
void add(const std::wstring& field_name, const std::vector<std::string>&
terms) override;
@@ -35,6 +35,7 @@ public:
private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
+ const io::IOContext* _io_ctx = nullptr;
std::wstring _field_name;
std::vector<std::string> _terms;
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.cpp
b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.cpp
index ec1b5bdd9e4..f82433826e9 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.cpp
@@ -30,7 +30,7 @@
namespace doris::segment_v2 {
PhraseEdgeQuery::PhraseEdgeQuery(const
std::shared_ptr<lucene::search::IndexSearcher>& searcher,
- const TQueryOptions& query_options)
+ const TQueryOptions& query_options, const
io::IOContext* io_ctx)
: _searcher(searcher),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.h
b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.h
index 5daf382e0d0..9eb3bd57c4a 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.h
@@ -31,7 +31,7 @@ namespace doris::segment_v2 {
class PhraseEdgeQuery : public Query {
public:
PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>&
searcher,
- const TQueryOptions& query_options);
+ const TQueryOptions& query_options, const io::IOContext*
io_ctx);
~PhraseEdgeQuery() override = default;
void add(const std::wstring& field_name, const std::vector<std::string>&
terms) override;
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.cpp
b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.cpp
index 407e515dc92..88bb3c1171f 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.cpp
@@ -23,7 +23,8 @@
namespace doris::segment_v2 {
PhrasePrefixQuery::PhrasePrefixQuery(const
std::shared_ptr<lucene::search::IndexSearcher>& searcher,
- const TQueryOptions& query_options)
+ const TQueryOptions& query_options,
+ const io::IOContext* io_ctx)
: _searcher(searcher),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.h
b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.h
index e565c0409cf..5cac597951e 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.h
@@ -31,7 +31,7 @@ namespace doris::segment_v2 {
class PhrasePrefixQuery : public Query {
public:
PhrasePrefixQuery(const std::shared_ptr<lucene::search::IndexSearcher>&
searcher,
- const TQueryOptions& query_options);
+ const TQueryOptions& query_options, const io::IOContext*
io_ctx);
~PhrasePrefixQuery() override = default;
void add(const std::wstring& field_name, const std::vector<std::string>&
terms) override;
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.cpp
b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.cpp
index 0ca2dce94e3..13081869712 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.cpp
@@ -118,8 +118,8 @@ bool
OrderedSloppyPhraseMatcher::stretch_to_order(PostingsAndPosition* prev_post
}
PhraseQuery::PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>&
searcher,
- const TQueryOptions& query_options)
- : _searcher(searcher) {}
+ const TQueryOptions& query_options, const
io::IOContext* io_ctx)
+ : _searcher(searcher), _io_ctx(io_ctx) {}
PhraseQuery::~PhraseQuery() {
for (auto& term_doc : _term_docs) {
@@ -166,7 +166,7 @@ void PhraseQuery::add(const std::wstring& field_name, const
std::vector<std::str
std::wstring ws_term = StringUtil::string_to_wstring(terms[0]);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
- TermDocs* term_doc = _searcher->getReader()->termDocs(t);
+ TermDocs* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
_term_docs.push_back(term_doc);
_lead1 = TermIterator(term_doc);
return;
@@ -177,7 +177,7 @@ void PhraseQuery::add(const std::wstring& field_name, const
std::vector<std::str
std::wstring ws_term = StringUtil::string_to_wstring(term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
- TermPositions* term_pos = _searcher->getReader()->termPositions(t);
+ TermPositions* term_pos = _searcher->getReader()->termPositions(t,
_io_ctx);
_term_docs.push_back(term_pos);
iterators.emplace_back(term_pos);
return term_pos;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.h
b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.h
index 253ba782b78..006d1eddec1 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.h
@@ -85,7 +85,7 @@ using Matcher = std::variant<ExactPhraseMatcher,
OrderedSloppyPhraseMatcher, Phr
class PhraseQuery : public Query {
public:
PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
- const TQueryOptions& query_options);
+ const TQueryOptions& query_options, const io::IOContext*
io_ctx);
~PhraseQuery() override;
void add(const InvertedIndexQueryInfo& query_info) override;
@@ -106,6 +106,7 @@ public:
private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
+ const io::IOContext* _io_ctx = nullptr;
TermIterator _lead1;
TermIterator _lead2;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/query.h
b/be/src/olap/rowset/segment_v2/inverted_index/query/query.h
index cef7fd51f72..786abf8acd9 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/query.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/query.h
@@ -27,6 +27,7 @@
#include <memory>
#include "common/status.h"
+#include "io/io_common.h"
#include "roaring/roaring.hh"
CL_NS_USE(index)
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.cpp
b/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.cpp
index 007da8289dc..69de4b7818b 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.cpp
@@ -25,10 +25,10 @@
namespace doris::segment_v2 {
RegexpQuery::RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>&
searcher,
- const TQueryOptions& query_options)
+ const TQueryOptions& query_options, const
io::IOContext* io_ctx)
: _searcher(searcher),
_max_expansions(query_options.inverted_index_max_expansions),
- _query(searcher, query_options) {}
+ _query(searcher, query_options, io_ctx) {}
void RegexpQuery::add(const std::wstring& field_name, const
std::vector<std::string>& patterns) {
if (patterns.size() != 1) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.h
b/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.h
index 336b2d0b6a6..650ad2bf10b 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.h
@@ -28,7 +28,7 @@ namespace doris::segment_v2 {
class RegexpQuery : public Query {
public:
RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
- const TQueryOptions& query_options);
+ const TQueryOptions& query_options, const io::IOContext*
io_ctx);
~RegexpQuery() override = default;
void add(const std::wstring& field_name, const std::vector<std::string>&
patterns) override;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index 7613df112ed..60006ea8455 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -59,6 +59,8 @@ private:
CL_NS(store)::IndexInput* base;
int64_t fileOffset;
int64_t _length;
+ const io::IOContext* _io_ctx = nullptr;
+ bool _is_index_file = false; // Indicates if the file is a TII file
protected:
void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override;
@@ -75,6 +77,8 @@ public:
const char* getDirectoryType() const override { return
DorisCompoundReader::getClassName(); }
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "CSIndexInput"; }
+ void setIoContext(const void* io_ctx) override;
+ void setIndexFile(bool isIndexFile) override;
};
CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t
fileOffset,
@@ -92,9 +96,12 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t
len) {
if (start + len > _length) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
+ base->setIoContext(_io_ctx);
+ base->setIndexFile(_is_index_file);
base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
+ base->setIoContext(nullptr);
}
CSIndexInput::~CSIndexInput() = default;
@@ -111,6 +118,14 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) :
BufferedIndexInput(clone
void CSIndexInput::close() {}
+void CSIndexInput::setIoContext(const void* io_ctx) {
+ _io_ctx = static_cast<const io::IOContext*>(io_ctx);
+}
+
+void CSIndexInput::setIndexFile(bool isIndexFile) {
+ _is_index_file = isIndexFile;
+}
+
DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
int32_t read_buffer_size)
: _ram_dir(new lucene::store::RAMDirectory()),
_stream(stream),
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
index e0c75922c98..113833d560f 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
@@ -27,10 +27,9 @@
namespace doris::segment_v2 {
-Status InvertedIndexFileReader::init(int32_t read_buffer_size, bool
open_idx_file_cache) {
+Status InvertedIndexFileReader::init(int32_t read_buffer_size) {
if (!_inited) {
_read_buffer_size = read_buffer_size;
- _open_idx_file_cache = open_idx_file_cache;
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
auto st = _init_from_v2(read_buffer_size);
if (!st.ok()) {
@@ -76,7 +75,6 @@ Status InvertedIndexFileReader::_init_from_v2(int32_t
read_buffer_size) {
"CLuceneError occur when open idx file {}, error msg: {}",
index_file_full_path,
err.what());
}
- index_input->setIdxFileCache(_open_idx_file_cache);
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
// 3. read file
@@ -198,7 +196,6 @@ Result<std::unique_ptr<DorisCompoundReader>>
InvertedIndexFileReader::_open(
}
// 3. read file in DorisCompoundReader
- index_input->setIdxFileCache(_open_idx_file_cache);
compound_reader =
std::make_unique<DorisCompoundReader>(index_input, _read_buffer_size);
} catch (CLuceneError& err) {
return
ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
index 8bc28b1882f..3b7161c7643 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
@@ -58,8 +58,7 @@ public:
_storage_format(storage_format),
_idx_file_info(idx_file_info) {}
- Status init(int32_t read_buffer_size =
config::inverted_index_read_buffer_size,
- bool open_idx_file_cache = false);
+ Status init(int32_t read_buffer_size =
config::inverted_index_read_buffer_size);
Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex*
index_meta) const;
void debug_file_entries();
std::string get_index_file_cache_key(const TabletIndex* index_meta) const;
@@ -80,7 +79,6 @@ private:
const io::FileSystemSPtr _fs;
std::string _index_path_prefix;
int32_t _read_buffer_size = -1;
- bool _open_idx_file_cache = false;
InvertedIndexStorageFormatPB _storage_format;
mutable std::shared_mutex _mutex; // Use mutable for const read operations
bool _inited = false;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index ded71c8a6cc..29caf29936d 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -219,6 +219,27 @@ void DorisFSDirectory::FSIndexInput::close() {
}*/
}
+void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
+ if (io_ctx) {
+ const auto& ctx = static_cast<const io::IOContext*>(io_ctx);
+ _io_ctx.reader_type = ctx->reader_type;
+ _io_ctx.query_id = ctx->query_id;
+ _io_ctx.file_cache_stats = ctx->file_cache_stats;
+ } else {
+ _io_ctx.reader_type = ReaderType::UNKNOWN;
+ _io_ctx.query_id = nullptr;
+ _io_ctx.file_cache_stats = nullptr;
+ }
+}
+
+const void* DorisFSDirectory::FSIndexInput::getIoContext() {
+ return &_io_ctx;
+}
+
+void DorisFSDirectory::FSIndexInput::setIndexFile(bool isIndexFile) {
+ _io_ctx.is_index_data = isIndexFile;
+}
+
void DorisFSDirectory::FSIndexInput::seekInternal(const int64_t position) {
CND_PRECONDITION(position >= 0 && position < _handle->_length, "Seeking
out of range");
_pos = position;
@@ -239,9 +260,23 @@ void DorisFSDirectory::FSIndexInput::readInternal(uint8_t*
b, const int32_t len)
_handle->_fpos = _pos;
}
+ DBUG_EXECUTE_IF(
+ "DorisFSDirectory::FSIndexInput::readInternal", ({
+ static thread_local std::unordered_map<const TUniqueId*,
io::FileCacheStatistics*>
+ thread_file_cache_map;
+ auto it = thread_file_cache_map.find(_io_ctx.query_id);
+ if (it != thread_file_cache_map.end()) {
+ if (_io_ctx.file_cache_stats != it->second) {
+ _CLTHROWA(CL_ERR_IO, "File cache statistics mismatch");
+ }
+ } else {
+ thread_file_cache_map[_io_ctx.query_id] =
_io_ctx.file_cache_stats;
+ }
+ }));
+
Slice result {b, (size_t)len};
size_t bytes_read = 0;
- auto st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx);
+ Status st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx);
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error",
{
st = Status::InternalError(
"debug point:
DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error");
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
index 59ae6db1a96..fd92873c970 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
@@ -180,8 +180,6 @@ class DorisFSDirectory::FSIndexInput : public
lucene::store::BufferedIndexInput
: BufferedIndexInput(buffer_size) {
this->_pos = 0;
this->_handle = std::move(handle);
- this->_io_ctx.reader_type = ReaderType::READER_QUERY;
- this->_io_ctx.is_index_data = false;
}
protected:
@@ -199,8 +197,9 @@ public:
const char* getDirectoryType() const override { return
DorisFSDirectory::getClassName(); }
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "FSIndexInput"; }
-
- void setIdxFileCache(bool index) override { _io_ctx.is_index_data = index;
}
+ void setIoContext(const void* io_ctx) override;
+ const void* getIoContext() override;
+ void setIndexFile(bool isIndexFile) override;
std::mutex _this_lock;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index 7b8504322d2..b31ba80ee46 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -102,7 +102,8 @@ std::string InvertedIndexReader::get_index_file_path() {
return _inverted_index_file_reader->get_index_file_path(&_index_meta);
}
-Status InvertedIndexReader::read_null_bitmap(OlapReaderStatistics* stats,
+Status InvertedIndexReader::read_null_bitmap(const io::IOContext* io_ctx,
+ OlapReaderStatistics* stats,
InvertedIndexQueryCacheHandle*
cache_handle,
lucene::store::Directory* dir) {
SCOPED_RAW_TIMER(&stats->inverted_index_query_null_bitmap_timer);
@@ -120,9 +121,7 @@ Status
InvertedIndexReader::read_null_bitmap(OlapReaderStatistics* stats,
if (!dir) {
// TODO: ugly code here, try to refact.
- bool open_idx_file_cache = true;
- auto st =
_inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
- open_idx_file_cache);
+ auto st =
_inverted_index_file_reader->init(config::inverted_index_read_buffer_size);
if (!st.ok()) {
LOG(WARNING) << st;
return st;
@@ -138,6 +137,7 @@ Status
InvertedIndexReader::read_null_bitmap(OlapReaderStatistics* stats,
InvertedIndexDescriptor::get_temporary_null_bitmap_file_name();
if (dir->fileExists(null_bitmap_file_name)) {
null_bitmap_in = dir->openInput(null_bitmap_file_name);
+ null_bitmap_in->setIoContext(io_ctx);
size_t null_bitmap_size = null_bitmap_in->length();
faststring buf;
buf.resize(null_bitmap_size);
@@ -165,7 +165,8 @@ Status
InvertedIndexReader::read_null_bitmap(OlapReaderStatistics* stats,
}
Status InvertedIndexReader::handle_searcher_cache(
- InvertedIndexCacheHandle* inverted_index_cache_handle,
OlapReaderStatistics* stats) {
+ InvertedIndexCacheHandle* inverted_index_cache_handle, const
io::IOContext* io_ctx,
+ OlapReaderStatistics* stats) {
auto index_file_key =
_inverted_index_file_reader->get_index_file_cache_key(&_index_meta);
InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key);
if (InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
@@ -179,9 +180,7 @@ Status InvertedIndexReader::handle_searcher_cache(
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_open_timer);
IndexSearcherPtr searcher;
- bool open_idx_file_cache = true;
- auto st =
_inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
- open_idx_file_cache);
+ auto st =
_inverted_index_file_reader->init(config::inverted_index_read_buffer_size);
if (!st.ok()) {
LOG(WARNING) << st;
return st;
@@ -191,7 +190,7 @@ Status InvertedIndexReader::handle_searcher_cache(
// to avoid open directory additionally for null_bitmap
// TODO: handle null bitmap procedure in new format.
InvertedIndexQueryCacheHandle null_bitmap_cache_handle;
- static_cast<void>(read_null_bitmap(stats, &null_bitmap_cache_handle,
dir.get()));
+ static_cast<void>(read_null_bitmap(io_ctx, stats,
&null_bitmap_cache_handle, dir.get()));
RETURN_IF_ERROR(create_index_searcher(dir.release(), &searcher,
mem_tracker.get(), type()));
auto* cache_value = new InvertedIndexSearcherCache::CacheValue(
std::move(searcher), mem_tracker->consumption(), UnixMillis());
@@ -211,22 +210,21 @@ Status
InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir,
auto searcher_result =
DORIS_TRY(index_searcher_builder->get_index_searcher(dir));
*searcher = searcher_result;
- if (std::string(dir->getObjectName()) == "DorisCompoundReader") {
-
static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIdxFileCache(false);
- }
+
// NOTE: before mem_tracker hook becomes active, we caculate reader memory
size by hand.
mem_tracker->consume(index_searcher_builder->get_reader_size());
return Status::OK();
};
Status InvertedIndexReader::match_index_search(
- OlapReaderStatistics* stats, RuntimeState* runtime_state,
InvertedIndexQueryType query_type,
- const InvertedIndexQueryInfo& query_info, const
FulltextIndexSearcherPtr& index_searcher,
+ const io::IOContext* io_ctx, OlapReaderStatistics* stats,
RuntimeState* runtime_state,
+ InvertedIndexQueryType query_type, const InvertedIndexQueryInfo&
query_info,
+ const FulltextIndexSearcherPtr& index_searcher,
const std::shared_ptr<roaring::Roaring>& term_match_bitmap) {
TQueryOptions queryOptions = runtime_state->query_options();
try {
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer);
- auto query = QueryFactory::create(query_type, index_searcher,
queryOptions);
+ auto query = QueryFactory::create(query_type, index_searcher,
queryOptions, io_ctx);
if (!query) {
return Status::Error<ErrorCode::INVERTED_INDEX_INVALID_PARAMETERS>(
"query type " + query_type_to_string(query_type) + ",
query is nullptr");
@@ -240,15 +238,17 @@ Status InvertedIndexReader::match_index_search(
return Status::OK();
}
-Status FullTextIndexReader::new_iterator(OlapReaderStatistics* stats,
RuntimeState* runtime_state,
+Status FullTextIndexReader::new_iterator(const io::IOContext& io_ctx,
OlapReaderStatistics* stats,
+ RuntimeState* runtime_state,
std::unique_ptr<InvertedIndexIterator>* iterator) {
- *iterator = InvertedIndexIterator::create_unique(stats, runtime_state,
shared_from_this());
+ *iterator =
+ InvertedIndexIterator::create_unique(io_ctx, stats, runtime_state,
shared_from_this());
return Status::OK();
}
-Status FullTextIndexReader::query(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
- const std::string& column_name, const void*
query_value,
- InvertedIndexQueryType query_type,
+Status FullTextIndexReader::query(const io::IOContext* io_ctx,
OlapReaderStatistics* stats,
+ RuntimeState* runtime_state, const
std::string& column_name,
+ const void* query_value,
InvertedIndexQueryType query_type,
std::shared_ptr<roaring::Roaring>& bit_map) {
SCOPED_RAW_TIMER(&stats->inverted_index_query_timer);
@@ -325,12 +325,12 @@ Status FullTextIndexReader::query(OlapReaderStatistics*
stats, RuntimeState* run
FulltextIndexSearcherPtr* searcher_ptr = nullptr;
InvertedIndexCacheHandle inverted_index_cache_handle;
- RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle,
stats));
+ RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle,
io_ctx, stats));
auto searcher_variant =
inverted_index_cache_handle.get_index_searcher();
searcher_ptr =
std::get_if<FulltextIndexSearcherPtr>(&searcher_variant);
if (searcher_ptr != nullptr) {
term_match_bitmap = std::make_shared<roaring::Roaring>();
- RETURN_IF_ERROR(match_index_search(stats, runtime_state,
query_type, query_info,
+ RETURN_IF_ERROR(match_index_search(io_ctx, stats, runtime_state,
query_type, query_info,
*searcher_ptr,
term_match_bitmap));
term_match_bitmap->runOptimize();
cache->insert(cache_key, term_match_bitmap, &cache_handler);
@@ -348,13 +348,15 @@ InvertedIndexReaderType FullTextIndexReader::type() {
}
Status StringTypeInvertedIndexReader::new_iterator(
- OlapReaderStatistics* stats, RuntimeState* runtime_state,
+ const io::IOContext& io_ctx, OlapReaderStatistics* stats,
RuntimeState* runtime_state,
std::unique_ptr<InvertedIndexIterator>* iterator) {
- *iterator = InvertedIndexIterator::create_unique(stats, runtime_state,
shared_from_this());
+ *iterator =
+ InvertedIndexIterator::create_unique(io_ctx, stats, runtime_state,
shared_from_this());
return Status::OK();
}
-Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats,
+Status StringTypeInvertedIndexReader::query(const io::IOContext* io_ctx,
+ OlapReaderStatistics* stats,
RuntimeState* runtime_state,
const std::string& column_name,
const void* query_value,
InvertedIndexQueryType query_type,
@@ -398,7 +400,7 @@ Status
StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats,
auto result = std::make_shared<roaring::Roaring>();
FulltextIndexSearcherPtr* searcher_ptr = nullptr;
InvertedIndexCacheHandle inverted_index_cache_handle;
- RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle,
stats));
+ RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle,
io_ctx, stats));
auto searcher_variant = inverted_index_cache_handle.get_index_searcher();
searcher_ptr = std::get_if<FulltextIndexSearcherPtr>(&searcher_variant);
if (searcher_ptr != nullptr) {
@@ -407,7 +409,7 @@ Status
StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats,
case InvertedIndexQueryType::MATCH_ANY_QUERY:
case InvertedIndexQueryType::MATCH_ALL_QUERY:
case InvertedIndexQueryType::EQUAL_QUERY: {
- RETURN_IF_ERROR(match_index_search(stats, runtime_state,
+ RETURN_IF_ERROR(match_index_search(io_ctx, stats,
runtime_state,
InvertedIndexQueryType::MATCH_ANY_QUERY,
query_info, *searcher_ptr,
result));
break;
@@ -415,8 +417,8 @@ Status
StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats,
case InvertedIndexQueryType::MATCH_PHRASE_QUERY:
case InvertedIndexQueryType::MATCH_PHRASE_PREFIX_QUERY:
case InvertedIndexQueryType::MATCH_REGEXP_QUERY: {
- RETURN_IF_ERROR(match_index_search(stats, runtime_state,
query_type, query_info,
- *searcher_ptr, result));
+ RETURN_IF_ERROR(match_index_search(io_ctx, stats,
runtime_state, query_type,
+ query_info, *searcher_ptr,
result));
break;
}
case InvertedIndexQueryType::LESS_THAN_QUERY:
@@ -481,9 +483,11 @@ InvertedIndexReaderType
StringTypeInvertedIndexReader::type() {
return InvertedIndexReaderType::STRING_TYPE;
}
-Status BkdIndexReader::new_iterator(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
+Status BkdIndexReader::new_iterator(const io::IOContext& io_ctx,
OlapReaderStatistics* stats,
+ RuntimeState* runtime_state,
std::unique_ptr<InvertedIndexIterator>*
iterator) {
- *iterator = InvertedIndexIterator::create_unique(stats, runtime_state,
shared_from_this());
+ *iterator =
+ InvertedIndexIterator::create_unique(io_ctx, stats, runtime_state,
shared_from_this());
return Status::OK();
}
@@ -611,12 +615,12 @@ Status BkdIndexReader::invoke_bkd_query(const void*
query_value, InvertedIndexQu
return Status::OK();
}
-Status BkdIndexReader::try_query(OlapReaderStatistics* stats, const
std::string& column_name,
- const void* query_value,
InvertedIndexQueryType query_type,
- uint32_t* count) {
+Status BkdIndexReader::try_query(const io::IOContext* io_ctx,
OlapReaderStatistics* stats,
+ const std::string& column_name, const void*
query_value,
+ InvertedIndexQueryType query_type, uint32_t*
count) {
try {
std::shared_ptr<lucene::util::bkd::bkd_reader> r;
- auto st = get_bkd_reader(r, stats);
+ auto st = get_bkd_reader(r, io_ctx, stats);
if (!st.ok()) {
LOG(WARNING) << "get bkd reader for "
<<
_inverted_index_file_reader->get_index_file_path(&_index_meta)
@@ -648,15 +652,15 @@ Status BkdIndexReader::try_query(OlapReaderStatistics*
stats, const std::string&
return Status::OK();
}
-Status BkdIndexReader::query(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
- const std::string& column_name, const void*
query_value,
- InvertedIndexQueryType query_type,
+Status BkdIndexReader::query(const io::IOContext* io_ctx,
OlapReaderStatistics* stats,
+ RuntimeState* runtime_state, const std::string&
column_name,
+ const void* query_value, InvertedIndexQueryType
query_type,
std::shared_ptr<roaring::Roaring>& bit_map) {
SCOPED_RAW_TIMER(&stats->inverted_index_query_timer);
try {
std::shared_ptr<lucene::util::bkd::bkd_reader> r;
- auto st = get_bkd_reader(r, stats);
+ auto st = get_bkd_reader(r, io_ctx, stats);
if (!st.ok()) {
LOG(WARNING) << "get bkd reader for "
<<
_inverted_index_file_reader->get_index_file_path(&_index_meta)
@@ -692,11 +696,11 @@ Status BkdIndexReader::query(OlapReaderStatistics* stats,
RuntimeState* runtime_
}
}
-Status BkdIndexReader::get_bkd_reader(BKDIndexSearcherPtr& bkd_reader,
+Status BkdIndexReader::get_bkd_reader(BKDIndexSearcherPtr& bkd_reader, const
io::IOContext* io_ctx,
OlapReaderStatistics* stats) {
BKDIndexSearcherPtr* bkd_searcher = nullptr;
InvertedIndexCacheHandle inverted_index_cache_handle;
- RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle,
stats));
+ RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle,
io_ctx, stats));
auto searcher_variant = inverted_index_cache_handle.get_index_searcher();
bkd_searcher = std::get_if<BKDIndexSearcherPtr>(&searcher_variant);
if (bkd_searcher) {
@@ -1126,8 +1130,8 @@ Status InvertedIndexIterator::read_from_inverted_index(
}
}
- RETURN_IF_ERROR(
- _reader->query(_stats, _runtime_state, column_name, query_value,
query_type, bit_map));
+ RETURN_IF_ERROR(_reader->query(&_io_ctx, _stats, _runtime_state,
column_name, query_value,
+ query_type, bit_map));
return Status::OK();
}
@@ -1141,7 +1145,8 @@ Status
InvertedIndexIterator::try_read_from_inverted_index(const std::string& co
query_type == InvertedIndexQueryType::LESS_EQUAL_QUERY ||
query_type == InvertedIndexQueryType::LESS_THAN_QUERY ||
query_type == InvertedIndexQueryType::EQUAL_QUERY) {
- RETURN_IF_ERROR(_reader->try_query(_stats, column_name, query_value,
query_type, count));
+ RETURN_IF_ERROR(
+ _reader->try_query(&_io_ctx, _stats, column_name, query_value,
query_type, count));
}
return Status::OK();
}
@@ -1159,4 +1164,5 @@ template class
InvertedIndexVisitor<InvertedIndexQueryType::EQUAL_QUERY>;
template class InvertedIndexVisitor<InvertedIndexQueryType::LESS_EQUAL_QUERY>;
template class
InvertedIndexVisitor<InvertedIndexQueryType::GREATER_THAN_QUERY>;
template class
InvertedIndexVisitor<InvertedIndexQueryType::GREATER_EQUAL_QUERY>;
+
} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_reader.h
index f37516bc40f..57975ab7fec 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.h
@@ -181,17 +181,18 @@ public:
virtual ~InvertedIndexReader() = default;
// create a new column iterator. Client should delete returned iterator
- virtual Status new_iterator(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
+ virtual Status new_iterator(const io::IOContext& io_ctx,
OlapReaderStatistics* stats,
+ RuntimeState* runtime_state,
std::unique_ptr<InvertedIndexIterator>*
iterator) = 0;
- virtual Status query(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
- const std::string& column_name, const void*
query_value,
- InvertedIndexQueryType query_type,
+ virtual Status query(const io::IOContext* io_ctx, OlapReaderStatistics*
stats,
+ RuntimeState* runtime_state, const std::string&
column_name,
+ const void* query_value, InvertedIndexQueryType
query_type,
std::shared_ptr<roaring::Roaring>& bit_map) = 0;
- virtual Status try_query(OlapReaderStatistics* stats, const std::string&
column_name,
- const void* query_value, InvertedIndexQueryType
query_type,
- uint32_t* count) = 0;
+ virtual Status try_query(const io::IOContext* io_ctx,
OlapReaderStatistics* stats,
+ const std::string& column_name, const void*
query_value,
+ InvertedIndexQueryType query_type, uint32_t*
count) = 0;
- Status read_null_bitmap(OlapReaderStatistics* stats,
+ Status read_null_bitmap(const io::IOContext* io_ctx, OlapReaderStatistics*
stats,
InvertedIndexQueryCacheHandle* cache_handle,
lucene::store::Directory* dir = nullptr);
@@ -222,15 +223,15 @@ public:
}
virtual Status handle_searcher_cache(InvertedIndexCacheHandle*
inverted_index_cache_handle,
- OlapReaderStatistics* stats);
+ const io::IOContext* io_ctx,
OlapReaderStatistics* stats);
std::string get_index_file_path();
static Status create_index_searcher(lucene::store::Directory* dir,
IndexSearcherPtr* searcher,
MemTracker* mem_tracker,
InvertedIndexReaderType reader_type);
protected:
- Status match_index_search(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
- InvertedIndexQueryType query_type,
+ Status match_index_search(const io::IOContext* io_ctx,
OlapReaderStatistics* stats,
+ RuntimeState* runtime_state,
InvertedIndexQueryType query_type,
const InvertedIndexQueryInfo& query_info,
const FulltextIndexSearcherPtr& index_searcher,
const std::shared_ptr<roaring::Roaring>&
term_match_bitmap);
@@ -252,15 +253,16 @@ public:
: InvertedIndexReader(index_meta, inverted_index_file_reader) {}
~FullTextIndexReader() override = default;
- Status new_iterator(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
+ Status new_iterator(const io::IOContext& io_ctx, OlapReaderStatistics*
stats,
+ RuntimeState* runtime_state,
std::unique_ptr<InvertedIndexIterator>* iterator)
override;
- Status query(OlapReaderStatistics* stats, RuntimeState* runtime_state,
- const std::string& column_name, const void* query_value,
- InvertedIndexQueryType query_type,
+ Status query(const io::IOContext* io_ctx, OlapReaderStatistics* stats,
+ RuntimeState* runtime_state, const std::string& column_name,
+ const void* query_value, InvertedIndexQueryType query_type,
std::shared_ptr<roaring::Roaring>& bit_map) override;
- Status try_query(OlapReaderStatistics* stats, const std::string&
column_name,
- const void* query_value, InvertedIndexQueryType
query_type,
- uint32_t* count) override {
+ Status try_query(const io::IOContext* io_ctx, OlapReaderStatistics* stats,
+ const std::string& column_name, const void* query_value,
+ InvertedIndexQueryType query_type, uint32_t* count)
override {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
"FullTextIndexReader not support try_query");
}
@@ -278,15 +280,16 @@ public:
: InvertedIndexReader(index_meta, inverted_index_file_reader) {}
~StringTypeInvertedIndexReader() override = default;
- Status new_iterator(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
+ Status new_iterator(const io::IOContext& io_ctx, OlapReaderStatistics*
stats,
+ RuntimeState* runtime_state,
std::unique_ptr<InvertedIndexIterator>* iterator)
override;
- Status query(OlapReaderStatistics* stats, RuntimeState* runtime_state,
- const std::string& column_name, const void* query_value,
- InvertedIndexQueryType query_type,
+ Status query(const io::IOContext* io_ctx, OlapReaderStatistics* stats,
+ RuntimeState* runtime_state, const std::string& column_name,
+ const void* query_value, InvertedIndexQueryType query_type,
std::shared_ptr<roaring::Roaring>& bit_map) override;
- Status try_query(OlapReaderStatistics* stats, const std::string&
column_name,
- const void* query_value, InvertedIndexQueryType
query_type,
- uint32_t* count) override {
+ Status try_query(const io::IOContext* io_ctx, OlapReaderStatistics* stats,
+ const std::string& column_name, const void* query_value,
+ InvertedIndexQueryType query_type, uint32_t* count)
override {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
"StringTypeInvertedIndexReader not support try_query");
}
@@ -337,16 +340,17 @@ public:
: InvertedIndexReader(index_meta, inverted_index_file_reader) {}
~BkdIndexReader() override = default;
- Status new_iterator(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
+ Status new_iterator(const io::IOContext& io_ctx, OlapReaderStatistics*
stats,
+ RuntimeState* runtime_state,
std::unique_ptr<InvertedIndexIterator>* iterator)
override;
- Status query(OlapReaderStatistics* stats, RuntimeState* runtime_state,
- const std::string& column_name, const void* query_value,
- InvertedIndexQueryType query_type,
+ Status query(const io::IOContext* io_ctx, OlapReaderStatistics* stats,
+ RuntimeState* runtime_state, const std::string& column_name,
+ const void* query_value, InvertedIndexQueryType query_type,
std::shared_ptr<roaring::Roaring>& bit_map) override;
- Status try_query(OlapReaderStatistics* stats, const std::string&
column_name,
- const void* query_value, InvertedIndexQueryType
query_type,
- uint32_t* count) override;
+ Status try_query(const io::IOContext* io_ctx, OlapReaderStatistics* stats,
+ const std::string& column_name, const void* query_value,
+ InvertedIndexQueryType query_type, uint32_t* count)
override;
Status invoke_bkd_try_query(const void* query_value,
InvertedIndexQueryType query_type,
std::shared_ptr<lucene::util::bkd::bkd_reader>
r, uint32_t* count);
Status invoke_bkd_query(const void* query_value, InvertedIndexQueryType
query_type,
@@ -358,7 +362,8 @@ public:
InvertedIndexVisitor<QT>* visitor);
InvertedIndexReaderType type() override;
- Status get_bkd_reader(BKDIndexSearcherPtr& reader, OlapReaderStatistics*
stats);
+ Status get_bkd_reader(BKDIndexSearcherPtr& reader, const io::IOContext*
io_ctx,
+ OlapReaderStatistics* stats);
private:
const TypeInfo* _type_info {};
@@ -446,9 +451,12 @@ class InvertedIndexIterator {
ENABLE_FACTORY_CREATOR(InvertedIndexIterator);
public:
- InvertedIndexIterator(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
- std::shared_ptr<InvertedIndexReader> reader)
- : _stats(stats), _runtime_state(runtime_state),
_reader(std::move(reader)) {}
+ InvertedIndexIterator(const io::IOContext& io_ctx, OlapReaderStatistics*
stats,
+ RuntimeState* runtime_state,
std::shared_ptr<InvertedIndexReader> reader)
+ : _io_ctx(io_ctx),
+ _stats(stats),
+ _runtime_state(runtime_state),
+ _reader(std::move(reader)) {}
Status read_from_inverted_index(const std::string& column_name, const
void* query_value,
InvertedIndexQueryType query_type,
uint32_t segment_num_rows,
@@ -459,7 +467,7 @@ public:
Status read_null_bitmap(InvertedIndexQueryCacheHandle* cache_handle,
lucene::store::Directory* dir = nullptr) {
- return _reader->read_null_bitmap(_stats, cache_handle, dir);
+ return _reader->read_null_bitmap(&_io_ctx, _stats, cache_handle, dir);
}
[[nodiscard]] InvertedIndexReaderType get_inverted_index_reader_type()
const;
@@ -469,6 +477,7 @@ public:
const InvertedIndexReaderPtr& reader() { return _reader; }
private:
+ io::IOContext _io_ctx;
OlapReaderStatistics* _stats = nullptr;
RuntimeState* _runtime_state = nullptr;
std::shared_ptr<InvertedIndexReader> _reader;
diff --git
a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp
b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp
index aed83201a63..0f1b27fd4fa 100644
---
a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp
+++
b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp
@@ -223,7 +223,7 @@ bool query_string(const TabletIndex* index,
for (int i = 0; i < query_data.size(); i++) {
TQueryOptions queryOptions;
auto query = QueryFactory::create(InvertedIndexQueryType::EQUAL_QUERY,
*string_searcher,
- queryOptions);
+ queryOptions, nullptr);
EXPECT_TRUE(query != nullptr);
InvertedIndexQueryInfo query_info;
query_info.field_name = column_name_ws;
@@ -253,7 +253,7 @@ bool query_fulltext(const TabletIndex* index,
for (int i = 0; i < query_data.size(); i++) {
TQueryOptions queryOptions;
auto query =
QueryFactory::create(InvertedIndexQueryType::MATCH_ANY_QUERY, *string_searcher,
- queryOptions);
+ queryOptions, nullptr);
EXPECT_TRUE(query != nullptr);
InvertedIndexQueryInfo query_info;
query_info.field_name = column_name_ws;
diff --git
a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp
b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp
index 8b5d403fca4..a46f5f210df 100644
---
a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp
+++
b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp
@@ -124,7 +124,7 @@ static bool query_string(const TabletIndex* index,
for (int i = 0; i < query_data.size(); i++) {
TQueryOptions queryOptions;
auto query = QueryFactory::create(InvertedIndexQueryType::EQUAL_QUERY,
*string_searcher,
- queryOptions);
+ queryOptions, nullptr);
EXPECT_TRUE(query != nullptr);
InvertedIndexQueryInfo query_info;
query_info.field_name = column_name_ws;
@@ -155,7 +155,7 @@ static bool query_fulltext(const TabletIndex* index,
for (int i = 0; i < query_data.size(); i++) {
TQueryOptions queryOptions;
auto query =
QueryFactory::create(InvertedIndexQueryType::MATCH_ANY_QUERY, *string_searcher,
- queryOptions);
+ queryOptions, nullptr);
EXPECT_TRUE(query != nullptr);
InvertedIndexQueryInfo query_info;
query_info.field_name = column_name_ws;
diff --git a/regression-test/data/fault_injection_p0/test_index_io_context.out
b/regression-test/data/fault_injection_p0/test_index_io_context.out
new file mode 100644
index 00000000000..3dc2880233e
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/test_index_io_context.out
@@ -0,0 +1,73 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+177
+
+-- !sql --
+177
+
+-- !sql --
+177
+
+-- !sql --
+177
+
+-- !sql --
+177
+
+-- !sql --
+177
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
+-- !sql --
+2
+
diff --git
a/regression-test/suites/fault_injection_p0/test_index_io_context.groovy
b/regression-test/suites/fault_injection_p0/test_index_io_context.groovy
new file mode 100644
index 00000000000..9e9a2674897
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_index_io_context.groovy
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_index_io_context", "nonConcurrent") {
+ def tableName1 = "test_index_io_context1"
+ def tableName2 = "test_index_io_context2"
+
+ def create_table = {table_name, index_format ->
+ sql """ DROP TABLE IF EXISTS ${table_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ `@timestamp` int(11) NULL COMMENT "",
+ `clientip` varchar(20) NULL COMMENT "",
+ `request` text NULL COMMENT "",
+ `status` int(11) NULL COMMENT "",
+ `size` int(11) NULL COMMENT "",
+ INDEX request_idx (`request`) USING INVERTED
PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT ''
+ )
+ DISTRIBUTED BY HASH(`@timestamp`) PROPERTIES(
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true",
+ "inverted_index_storage_format" = "${index_format}"
+ );
+ """
+ }
+
+ def load_httplogs_data = {table_name, label, read_flag, format_flag,
file_name, ignore_failure=false,
+ expected_succ_rows = -1, load_to_single_tablet =
'true' ->
+
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'label', label + "_" + UUID.randomUUID().toString()
+ set 'read_json_by_line', read_flag
+ set 'format', format_flag
+ file file_name // import json file
+ time 10000 // limit inflight 10s
+ if (expected_succ_rows >= 0) {
+ set 'max_filter_ratio', '1'
+ }
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (ignore_failure && expected_succ_rows < 0) { return }
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ }
+ }
+ }
+
+ try {
+ create_table(tableName1, "V1");
+ create_table(tableName2, "V2");
+
+ load_httplogs_data.call(tableName1, 'test_index_io_context1', 'true',
'json', 'documents-1000.json')
+ load_httplogs_data.call(tableName2, 'test_index_io_context2', 'true',
'json', 'documents-1000.json')
+
+ sql "sync"
+ sql """ set enable_common_expr_pushdown = true; """
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexInput::readInternal")
+
+ qt_sql """ select count() from ${tableName1} where request
match_any 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName1} where request
match_any 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName1} where request
match_any 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName2} where request
match_any 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName2} where request
match_any 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName2} where request
match_any 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName1} where request
match_all 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName1} where request
match_all 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName1} where request
match_all 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName2} where request
match_all 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName2} where request
match_all 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName2} where request
match_all 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName1} where request
match_phrase 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName1} where request
match_phrase 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName1} where request
match_phrase 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName2} where request
match_phrase 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName2} where request
match_phrase 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName2} where request
match_phrase 'ticket_quest_bg2.jpg'; """
+ qt_sql """ select count() from ${tableName1} where request
match_phrase 'ticket_quest_bg2.jpg ~10+'; """
+ qt_sql """ select count() from ${tableName1} where request
match_phrase 'ticket_quest_bg2.jpg ~10+'; """
+ qt_sql """ select count() from ${tableName1} where request
match_phrase 'ticket_quest_bg2.jpg ~10+'; """
+ qt_sql """ select count() from ${tableName2} where request
match_phrase 'ticket_quest_bg2.jpg ~10+'; """
+ qt_sql """ select count() from ${tableName2} where request
match_phrase 'ticket_quest_bg2.jpg ~10+'; """
+ qt_sql """ select count() from ${tableName2} where request
match_phrase 'ticket_quest_bg2.jpg ~10+'; """
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexInput::readInternal")
+ }
+ } finally {
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]