This is an automated email from the ASF dual-hosted git repository.
airborne pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f2c3201217e [Refactor](inverted index) refactor inverted index file
writer for v1/v2 index write #42328 (#43993)
f2c3201217e is described below
commit f2c3201217ec63b5aa985128ea067f8f0421de6d
Author: airborne12 <[email protected]>
AuthorDate: Fri Nov 15 15:45:08 2024 +0800
[Refactor](inverted index) refactor inverted index file writer for v1/v2
index write #42328 (#43993)
cherry pick from #42328
---
be/src/olap/compaction.cpp | 6 +-
.../segment_v2/inverted_index_compaction.cpp | 7 -
.../segment_v2/inverted_index_file_writer.cpp | 521 ++++++++++++---------
.../rowset/segment_v2/inverted_index_file_writer.h | 68 ++-
.../rowset/segment_v2/inverted_index_writer.cpp | 4 +-
.../segment_v2/inverted_index_file_writer_test.cpp | 515 ++++++++++++++++++++
6 files changed, 876 insertions(+), 245 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index a92fe28abf5..abbd84001c8 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -660,9 +660,11 @@ Status Compaction::do_inverted_index_compaction() {
DORIS_TRY(inverted_index_file_readers[src_segment_id]->open(index_meta));
}
for (int dest_segment_id = 0; dest_segment_id < dest_segment_num;
dest_segment_id++) {
- auto* dest_dir =
+ auto dest_dir =
DORIS_TRY(inverted_index_file_writers[dest_segment_id]->open(index_meta));
- dest_index_dirs[dest_segment_id] = dest_dir;
+ // Destination directories in dest_index_dirs do not need to
be deconstructed,
+ // but their lifecycle must be managed by
inverted_index_file_writers.
+ dest_index_dirs[dest_segment_id] = dest_dir.get();
}
auto st = compact_column(index_meta->index_id(), src_idx_dirs,
dest_index_dirs,
index_tmp_path.native(), trans_vec,
dest_segment_num_rows);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
index 88a8f241722..f988c46c027 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
@@ -76,13 +76,6 @@ Status compact_column(int64_t index_id,
// when index_writer is destroyed, if closeDir is set, dir will be close
// _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir
will be destroyed.
_CLDECDELETE(dir)
- for (auto* d : dest_index_dirs) {
- if (d != nullptr) {
- // NOTE: DO NOT close dest dir here, because it will be closed
when dest index writer finalize.
- //d->close();
- //_CLDELETE(d);
- }
- }
// delete temporary segment_path, only when inverted_index_ram_dir_enable
is false
if (!config::inverted_index_ram_dir_enable) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index 0e2dbe7d6bd..74f7398ea4a 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -19,17 +19,14 @@
#include <glog/logging.h>
+#include <algorithm>
#include <filesystem>
#include "common/status.h"
-#include "io/fs/file_writer.h"
-#include "io/fs/local_file_system.h"
-#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
#include "olap/rowset/segment_v2/inverted_index_reader.h"
#include "olap/tablet_schema.h"
-#include "runtime/exec_env.h"
namespace doris::segment_v2 {
@@ -38,32 +35,11 @@ Status
InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_di
return Status::OK();
}
-Result<DorisFSDirectory*> InvertedIndexFileWriter::open(const TabletIndex*
index_meta) {
- auto tmp_file_dir =
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
- const auto& local_fs = io::global_local_filesystem();
- auto local_fs_index_path =
InvertedIndexDescriptor::get_temporary_index_path(
- tmp_file_dir.native(), _rowset_id, _seg_id, index_meta->index_id(),
- index_meta->get_index_suffix());
- bool exists = false;
- auto st = local_fs->exists(local_fs_index_path, &exists);
- DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_error",
- { st = Status::Error<ErrorCode::IO_ERROR>("debug point: no
such file error"); })
- if (!st.ok()) {
- LOG(ERROR) << "index_path:" << local_fs_index_path << " exists error:"
<< st;
- return ResultError(st);
- }
- DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_true", {
exists = true; })
- if (exists) {
- LOG(ERROR) << "try to init a directory:" << local_fs_index_path << "
already exists";
- return ResultError(
- Status::InternalError("InvertedIndexFileWriter::open directory
already exists"));
- }
-
- bool can_use_ram_dir = true;
- auto* dir = DorisFSDirectoryFactory::getDirectory(local_fs,
local_fs_index_path.c_str(),
- can_use_ram_dir);
- auto key = std::make_pair(index_meta->index_id(),
index_meta->get_index_suffix());
- auto [it, inserted] = _indices_dirs.emplace(key,
std::unique_ptr<DorisFSDirectory>(dir));
+Status InvertedIndexFileWriter::_insert_directory_into_map(int64_t index_id,
+ const std::string&
index_suffix,
+
std::shared_ptr<DorisFSDirectory> dir) {
+ auto key = std::make_pair(index_id, index_suffix);
+ auto [it, inserted] = _indices_dirs.emplace(key, std::move(dir));
if (!inserted) {
LOG(ERROR) << "InvertedIndexFileWriter::open attempted to insert a
duplicate key: ("
<< key.first << ", " << key.second << ")";
@@ -71,8 +47,23 @@ Result<DorisFSDirectory*>
InvertedIndexFileWriter::open(const TabletIndex* index
for (const auto& entry : _indices_dirs) {
LOG(ERROR) << "Key: (" << entry.first.first << ", " <<
entry.first.second << ")";
}
- return ResultError(Status::InternalError(
- "InvertedIndexFileWriter::open attempted to insert a duplicate
dir"));
+ return Status::InternalError(
+ "InvertedIndexFileWriter::open attempted to insert a duplicate
dir");
+ }
+ return Status::OK();
+}
+
+Result<std::shared_ptr<DorisFSDirectory>> InvertedIndexFileWriter::open(
+ const TabletIndex* index_meta) {
+ auto local_fs_index_path =
InvertedIndexDescriptor::get_temporary_index_path(
+ _tmp_dir, _rowset_id, _seg_id, index_meta->index_id(),
index_meta->get_index_suffix());
+ bool can_use_ram_dir = true;
+ auto dir =
std::shared_ptr<DorisFSDirectory>(DorisFSDirectoryFactory::getDirectory(
+ _local_fs, local_fs_index_path.c_str(), can_use_ram_dir));
+ auto st =
+ _insert_directory_into_map(index_meta->index_id(),
index_meta->get_index_suffix(), dir);
+ if (!st.ok()) {
+ return ResultError(st);
}
return dir;
@@ -222,7 +213,7 @@ void InvertedIndexFileWriter::copyFile(const char*
fileName, lucene::store::Dire
int64_t chunk = bufferLength;
while (remainder > 0) {
- int64_t len = std::min(std::min(chunk, length), remainder);
+ int64_t len = std::min({chunk, length, remainder});
input->readBytes(buffer, len);
output->writeBytes(buffer, len);
remainder -= len;
@@ -252,125 +243,46 @@ void InvertedIndexFileWriter::copyFile(const char*
fileName, lucene::store::Dire
Status InvertedIndexFileWriter::write_v1() {
int64_t total_size = 0;
+ lucene::store::Directory* out_dir = nullptr;
+ std::unique_ptr<lucene::store::IndexOutput> output = nullptr;
for (const auto& entry : _indices_dirs) {
const int64_t index_id = entry.first.first;
const auto& index_suffix = entry.first.second;
try {
- const auto& directory = entry.second;
- std::vector<std::string> files;
- directory->list(&files);
- // remove write.lock file
- auto it = std::find(files.begin(), files.end(),
DorisFSDirectory::WRITE_LOCK_FILE);
- if (it != files.end()) {
- files.erase(it);
- }
+ const auto& directory = entry.second.get();
- std::vector<FileInfo> sorted_files;
- for (auto file : files) {
- FileInfo file_info;
- file_info.filename = file;
- file_info.filesize = directory->fileLength(file.c_str());
- sorted_files.emplace_back(std::move(file_info));
- }
- sort_files(sorted_files);
-
- int32_t file_count = sorted_files.size();
-
- io::Path cfs_path(InvertedIndexDescriptor::get_index_file_path_v1(
- _index_path_prefix, index_id, index_suffix));
- auto idx_path = cfs_path.parent_path();
- std::string idx_name = cfs_path.filename();
- // write file entries to ram directory to get header length
- lucene::store::RAMDirectory ram_dir;
- auto* out_idx = ram_dir.createOutput(idx_name.c_str());
-
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_ram_output_is_nullptr",
- { out_idx = nullptr; })
- if (out_idx == nullptr) {
- LOG(WARNING) << "Write compound file error: RAMDirectory
output is nullptr.";
- _CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error");
- }
+ // Prepare sorted file list
+ auto sorted_files = prepare_sorted_files(directory);
+
+ // Calculate header length
+ auto [header_length, header_file_count] =
+ calculate_header_length(sorted_files, directory);
+
+ // Create output stream
+ auto result = create_output_stream_v1(index_id, index_suffix);
+ out_dir = result.first;
+ output = std::move(result.second);
- std::unique_ptr<lucene::store::IndexOutput> ram_output(out_idx);
- ram_output->writeVInt(file_count);
- // write file entries in ram directory
- // number of files, which data are in header
- int header_file_count = 0;
- int64_t header_file_length = 0;
- const int64_t buffer_length = 16384;
- uint8_t ram_buffer[buffer_length];
- for (auto file : sorted_files) {
- ram_output->writeString(file.filename); // file name
- ram_output->writeLong(0); // data offset
- ram_output->writeLong(file.filesize); // file length
- header_file_length += file.filesize;
- if (header_file_length <=
DorisFSDirectory::MAX_HEADER_DATA_SIZE) {
- copyFile(file.filename.c_str(), directory.get(),
ram_output.get(), ram_buffer,
- buffer_length);
- header_file_count++;
- }
- }
- auto header_len = ram_output->getFilePointer();
- ram_output->close();
- ram_dir.deleteFile(idx_name.c_str());
- ram_dir.close();
-
- auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs,
idx_path.c_str());
- out_dir->set_file_writer_opts(_opts);
-
- auto* out = out_dir->createOutput(idx_name.c_str());
-
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr",
- { out = nullptr; });
- if (out == nullptr) {
- LOG(WARNING) << "Write compound file error: CompoundDirectory
output is nullptr.";
- _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
- }
- std::unique_ptr<lucene::store::IndexOutput> output(out);
size_t start = output->getFilePointer();
- output->writeVInt(file_count);
- // write file entries
- int64_t data_offset = header_len;
- uint8_t header_buffer[buffer_length];
- for (int i = 0; i < sorted_files.size(); ++i) {
- auto file = sorted_files[i];
- output->writeString(file.filename); // FileName
- // DataOffset
- if (i < header_file_count) {
- // file data write in header, so we set its offset to -1.
- output->writeLong(-1);
- } else {
- output->writeLong(data_offset);
- }
- output->writeLong(file.filesize); // FileLength
- if (i < header_file_count) {
- // append data
- copyFile(file.filename.c_str(), directory.get(),
output.get(), header_buffer,
- buffer_length);
- } else {
- data_offset += file.filesize;
- }
- }
- // write rest files' data
- uint8_t data_buffer[buffer_length];
- for (int i = header_file_count; i < sorted_files.size(); ++i) {
- auto file = sorted_files[i];
- copyFile(file.filename.c_str(), directory.get(), output.get(),
data_buffer,
- buffer_length);
- }
- out_dir->close();
- // NOTE: need to decrease ref count, but not to delete here,
- // because index cache may get the same directory from DIRECTORIES
- _CLDECDELETE(out_dir)
+ // Write header and data
+ write_header_and_data_v1(output.get(), sorted_files, directory,
header_length,
+ header_file_count);
+
+ // Close and clean up
+ finalize_output_dir(out_dir);
+
+ // Collect file information
auto compound_file_size = output->getFilePointer() - start;
output->close();
- //LOG(INFO) << (idx_path / idx_name).c_str() << " size:" <<
compound_file_size;
total_size += compound_file_size;
- InvertedIndexFileInfo_IndexInfo index_info;
- index_info.set_index_id(index_id);
- index_info.set_index_suffix(index_suffix);
- index_info.set_index_file_size(compound_file_size);
- auto* new_index_info = _file_info.add_index_info();
- *new_index_info = index_info;
+ add_index_info(index_id, index_suffix, compound_file_size);
+
} catch (CLuceneError& err) {
+ finalize_output_dir(out_dir);
+ if (output != nullptr) {
+ output->close();
+ output.reset();
+ }
auto index_path = InvertedIndexDescriptor::get_index_file_path_v1(
_index_path_prefix, index_id, index_suffix);
LOG(ERROR) << "CLuceneError occur when write_v1 idx file " <<
index_path
@@ -386,108 +298,267 @@ Status InvertedIndexFileWriter::write_v1() {
}
Status InvertedIndexFileWriter::write_v2() {
- io::Path index_path
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
- std::unique_ptr<lucene::store::IndexOutput> compound_file_output;
+ lucene::store::Directory* out_dir = nullptr;
+ std::unique_ptr<lucene::store::IndexOutput> compound_file_output = nullptr;
try {
- // Create the output stream to write the compound file
+ // Calculate header length and initialize offset
int64_t current_offset = headerLength();
+ // Prepare file metadata
+ auto file_metadata = prepare_file_metadata_v2(current_offset);
- io::Path index_path
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
-
- auto* out_dir =
- DorisFSDirectoryFactory::getDirectory(_fs,
index_path.parent_path().c_str());
- out_dir->set_file_writer_opts(_opts);
-
- DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is
nullptr";
- compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
- out_dir->createOutputV2(_idx_v2_writer.get()));
-
- // Write the version number
- compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2);
-
- // Write the number of indices
- const auto numIndices = static_cast<uint32_t>(_indices_dirs.size());
- compound_file_output->writeInt(numIndices);
-
- std::vector<std::tuple<std::string, int64_t, int64_t,
CL_NS(store)::Directory*>>
- file_metadata; // Store file name, offset, file length, and
corresponding directory
-
- // First, write all index information and file metadata
- for (const auto& entry : _indices_dirs) {
- const int64_t index_id = entry.first.first;
- const auto& index_suffix = entry.first.second;
- const auto& dir = entry.second;
- std::vector<std::string> files;
- dir->list(&files);
-
- auto it = std::find(files.begin(), files.end(),
DorisFSDirectory::WRITE_LOCK_FILE);
- if (it != files.end()) {
- files.erase(it);
- }
- // sort file list by file length
- std::vector<std::pair<std::string, int64_t>> sorted_files;
- for (const auto& file : files) {
- sorted_files.emplace_back(file, dir->fileLength(file.c_str()));
- }
-
- std::sort(
- sorted_files.begin(), sorted_files.end(),
- [](const std::pair<std::string, int64_t>& a,
- const std::pair<std::string, int64_t>& b) { return
(a.second < b.second); });
-
- int32_t file_count = sorted_files.size();
-
- // Write the index ID and the number of files
- compound_file_output->writeLong(index_id);
-
compound_file_output->writeInt(static_cast<int32_t>(index_suffix.length()));
- compound_file_output->writeBytes(reinterpret_cast<const
uint8_t*>(index_suffix.data()),
- index_suffix.length());
- compound_file_output->writeInt(file_count);
-
- // Calculate the offset for each file and write the file metadata
- for (const auto& file : sorted_files) {
- int64_t file_length = dir->fileLength(file.first.c_str());
-
compound_file_output->writeInt(static_cast<int32_t>(file.first.length()));
- compound_file_output->writeBytes(
- reinterpret_cast<const uint8_t*>(file.first.data()),
file.first.length());
- compound_file_output->writeLong(current_offset);
- compound_file_output->writeLong(file_length);
-
- file_metadata.emplace_back(file.first, current_offset,
file_length, dir.get());
- current_offset += file_length; // Update the data offset
- }
- }
+ // Create output stream
+ auto result = create_output_stream_v2();
+ out_dir = result.first;
+ compound_file_output = std::move(result.second);
- const int64_t buffer_length = 16384;
- uint8_t header_buffer[buffer_length];
+ // Write version and number of indices
+ write_version_and_indices_count(compound_file_output.get());
- // Next, write the file data
- for (const auto& info : file_metadata) {
- const std::string& file = std::get<0>(info);
- auto* dir = std::get<3>(info);
+ // Write index headers and file metadata
+ write_index_headers_and_metadata(compound_file_output.get(),
file_metadata);
- // Write the actual file data
- copyFile(file.c_str(), dir, compound_file_output.get(),
header_buffer, buffer_length);
- }
+ // Copy file data
+ copy_files_data_v2(compound_file_output.get(), file_metadata);
- out_dir->close();
- // NOTE: need to decrease ref count, but not to delete here,
- // because index cache may get the same directory from DIRECTORIES
- _CLDECDELETE(out_dir)
+ // Close and clean up
+ finalize_output_dir(out_dir);
_total_file_size = compound_file_output->getFilePointer();
- compound_file_output->close();
_file_info.set_index_size(_total_file_size);
+ compound_file_output->close();
+ return Status::OK();
} catch (CLuceneError& err) {
+ io::Path index_path
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
LOG(ERROR) << "CLuceneError occur when close idx file " << index_path
<< " error msg: " << err.what();
- if (compound_file_output) {
+ if (compound_file_output != nullptr) {
compound_file_output->close();
compound_file_output.reset();
}
+ finalize_output_dir(out_dir);
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occur when close idx file: {}, error msg: {}",
index_path.c_str(),
err.what());
}
- return Status::OK();
+}
+
+// Helper function implementations
+
+std::vector<FileInfo> InvertedIndexFileWriter::prepare_sorted_files(
+ lucene::store::Directory* directory) {
+ std::vector<std::string> files;
+ directory->list(&files);
+
+ // Remove write.lock file
+ files.erase(std::remove(files.begin(), files.end(),
DorisFSDirectory::WRITE_LOCK_FILE),
+ files.end());
+
+ std::vector<FileInfo> sorted_files;
+ for (const auto& file : files) {
+ FileInfo file_info;
+ file_info.filename = file;
+ file_info.filesize = directory->fileLength(file.c_str());
+ sorted_files.push_back(std::move(file_info));
+ }
+
+ // Sort the files
+ sort_files(sorted_files);
+ return sorted_files;
+}
+
+void InvertedIndexFileWriter::finalize_output_dir(lucene::store::Directory*
out_dir) {
+ if (out_dir != nullptr) {
+ out_dir->close();
+ _CLDECDELETE(out_dir)
+ }
+}
+
+void InvertedIndexFileWriter::add_index_info(int64_t index_id, const
std::string& index_suffix,
+ int64_t compound_file_size) {
+ InvertedIndexFileInfo_IndexInfo index_info;
+ index_info.set_index_id(index_id);
+ index_info.set_index_suffix(index_suffix);
+ index_info.set_index_file_size(compound_file_size);
+ auto* new_index_info = _file_info.add_index_info();
+ *new_index_info = index_info;
+}
+
+std::pair<int64_t, int32_t> InvertedIndexFileWriter::calculate_header_length(
+ const std::vector<FileInfo>& sorted_files, lucene::store::Directory*
directory) {
+ // Use RAMDirectory to calculate header length
+ lucene::store::RAMDirectory ram_dir;
+ auto* out_idx = ram_dir.createOutput("temp_idx");
+
DBUG_EXECUTE_IF("InvertedIndexFileWriter::calculate_header_length_ram_output_is_nullptr",
+ { out_idx = nullptr; })
+ if (out_idx == nullptr) {
+ LOG(WARNING) << "InvertedIndexFileWriter::calculate_header_length
error: RAMDirectory "
+ "output is nullptr.";
+ _CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error");
+ }
+ std::unique_ptr<lucene::store::IndexOutput> ram_output(out_idx);
+ int32_t file_count = sorted_files.size();
+ ram_output->writeVInt(file_count);
+
+ int64_t header_file_length = 0;
+ const int64_t buffer_length = 16384;
+ uint8_t ram_buffer[buffer_length];
+ int32_t header_file_count = 0;
+ for (const auto& file : sorted_files) {
+ ram_output->writeString(file.filename);
+ ram_output->writeLong(0);
+ ram_output->writeLong(file.filesize);
+ header_file_length += file.filesize;
+
+ if (header_file_length <= DorisFSDirectory::MAX_HEADER_DATA_SIZE) {
+ copyFile(file.filename.c_str(), directory, ram_output.get(),
ram_buffer, buffer_length);
+ header_file_count++;
+ }
+ }
+
+ int64_t header_length = ram_output->getFilePointer();
+ ram_output->close();
+ ram_dir.close();
+ return {header_length, header_file_count};
+}
+
+std::pair<lucene::store::Directory*,
std::unique_ptr<lucene::store::IndexOutput>>
+InvertedIndexFileWriter::create_output_stream_v1(int64_t index_id,
+ const std::string&
index_suffix) {
+ io::Path
cfs_path(InvertedIndexDescriptor::get_index_file_path_v1(_index_path_prefix,
index_id,
+
index_suffix));
+ auto idx_path = cfs_path.parent_path();
+ std::string idx_name = cfs_path.filename();
+
+ auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs,
idx_path.c_str());
+ out_dir->set_file_writer_opts(_opts);
+
+ auto* out = out_dir->createOutput(idx_name.c_str());
+
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr",
+ { out = nullptr; });
+ if (out == nullptr) {
+ LOG(WARNING) << "InvertedIndexFileWriter::create_output_stream_v1
error: CompoundDirectory "
+ "output is nullptr.";
+ _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
+ }
+
+ std::unique_ptr<lucene::store::IndexOutput> output(out);
+ return {out_dir, std::move(output)};
+}
+
+void
InvertedIndexFileWriter::write_header_and_data_v1(lucene::store::IndexOutput*
output,
+ const
std::vector<FileInfo>& sorted_files,
+
lucene::store::Directory* directory,
+ int64_t header_length,
+ int32_t
header_file_count) {
+ output->writeVInt(sorted_files.size());
+ int64_t data_offset = header_length;
+ const int64_t buffer_length = 16384;
+ uint8_t buffer[buffer_length];
+
+ for (int i = 0; i < sorted_files.size(); ++i) {
+ auto file = sorted_files[i];
+ output->writeString(file.filename);
+
+ // DataOffset
+ if (i < header_file_count) {
+ // file data write in header, so we set its offset to -1.
+ output->writeLong(-1);
+ } else {
+ output->writeLong(data_offset);
+ }
+ output->writeLong(file.filesize); // FileLength
+ if (i < header_file_count) {
+ // append data
+ copyFile(file.filename.c_str(), directory, output, buffer,
buffer_length);
+ } else {
+ data_offset += file.filesize;
+ }
+ }
+
+ for (size_t i = header_file_count; i < sorted_files.size(); ++i) {
+ copyFile(sorted_files[i].filename.c_str(), directory, output, buffer,
buffer_length);
+ }
+}
+
+std::pair<lucene::store::Directory*,
std::unique_ptr<lucene::store::IndexOutput>>
+InvertedIndexFileWriter::create_output_stream_v2() {
+ io::Path index_path
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
+ auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs,
index_path.parent_path().c_str());
+ out_dir->set_file_writer_opts(_opts);
+ DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is
nullptr";
+ auto compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
+ out_dir->createOutputV2(_idx_v2_writer.get()));
+ return std::make_pair(out_dir, std::move(compound_file_output));
+}
+
+void
InvertedIndexFileWriter::write_version_and_indices_count(lucene::store::IndexOutput*
output) {
+ // Write the version number
+ output->writeInt(InvertedIndexStorageFormatPB::V2);
+
+ // Write the number of indices
+ const auto num_indices = static_cast<uint32_t>(_indices_dirs.size());
+ output->writeInt(num_indices);
+}
+
+std::vector<InvertedIndexFileWriter::FileMetadata>
+InvertedIndexFileWriter::prepare_file_metadata_v2(int64_t& current_offset) {
+ std::vector<FileMetadata> file_metadata;
+
+ for (const auto& entry : _indices_dirs) {
+ const int64_t index_id = entry.first.first;
+ const auto& index_suffix = entry.first.second;
+ auto* dir = entry.second.get();
+
+ // Get sorted files
+ auto sorted_files = prepare_sorted_files(dir);
+
+ for (const auto& file : sorted_files) {
+ file_metadata.emplace_back(index_id, index_suffix, file.filename,
current_offset,
+ file.filesize, dir);
+ current_offset += file.filesize; // Update the data offset
+ }
+ }
+ return file_metadata;
+}
+
+void InvertedIndexFileWriter::write_index_headers_and_metadata(
+ lucene::store::IndexOutput* output, const std::vector<FileMetadata>&
file_metadata) {
+ // Group files by index_id and index_suffix
+ std::map<std::pair<int64_t, std::string>, std::vector<FileMetadata>>
indices;
+
+ for (const auto& meta : file_metadata) {
+ indices[{meta.index_id, meta.index_suffix}].push_back(meta);
+ }
+
+ for (const auto& index_entry : indices) {
+ int64_t index_id = index_entry.first.first;
+ const std::string& index_suffix = index_entry.first.second;
+ const auto& files = index_entry.second;
+
+ // Write the index ID and the number of files
+ output->writeLong(index_id);
+ output->writeInt(static_cast<int32_t>(index_suffix.length()));
+ output->writeBytes(reinterpret_cast<const
uint8_t*>(index_suffix.data()),
+ index_suffix.length());
+ output->writeInt(static_cast<int32_t>(files.size()));
+
+ // Write file metadata
+ for (const auto& file : files) {
+ output->writeInt(static_cast<int32_t>(file.filename.length()));
+ output->writeBytes(reinterpret_cast<const
uint8_t*>(file.filename.data()),
+ file.filename.length());
+ output->writeLong(file.offset);
+ output->writeLong(file.length);
+ }
+ }
+}
+
+void InvertedIndexFileWriter::copy_files_data_v2(lucene::store::IndexOutput*
output,
+ const
std::vector<FileMetadata>& file_metadata) {
+ const int64_t buffer_length = 16384;
+ uint8_t buffer[buffer_length];
+
+ for (const auto& meta : file_metadata) {
+ copyFile(meta.filename.c_str(), meta.directory, output, buffer,
buffer_length);
+ }
}
} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
index 31e287d6dd3..3a2fcc1e6ac 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
@@ -28,7 +28,9 @@
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
+#include "io/fs/local_file_system.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
+#include "runtime/exec_env.h"
namespace doris {
class TabletIndex;
@@ -36,7 +38,7 @@ class TabletIndex;
namespace segment_v2 {
class DorisFSDirectory;
using InvertedIndexDirectoryMap =
- std::map<std::pair<int64_t, std::string>,
std::unique_ptr<lucene::store::Directory>>;
+ std::map<std::pair<int64_t, std::string>,
std::shared_ptr<lucene::store::Directory>>;
class InvertedIndexFileWriter;
using InvertedIndexFileWriterPtr = std::unique_ptr<InvertedIndexFileWriter>;
@@ -58,16 +60,19 @@ public:
_rowset_id(std::move(rowset_id)),
_seg_id(seg_id),
_storage_format(storage_format),
- _idx_v2_writer(std::move(file_writer)) {}
+ _local_fs(io::global_local_filesystem()),
+ _idx_v2_writer(std::move(file_writer)) {
+ auto tmp_file_dir =
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
+ _tmp_dir = tmp_file_dir.native();
+ }
- Result<DorisFSDirectory*> open(const TabletIndex* index_meta);
+ Result<std::shared_ptr<DorisFSDirectory>> open(const TabletIndex*
index_meta);
Status delete_index(const TabletIndex* index_meta);
Status initialize(InvertedIndexDirectoryMap& indices_dirs);
- ~InvertedIndexFileWriter() = default;
+ virtual ~InvertedIndexFileWriter() = default;
Status write_v2();
Status write_v1();
Status close();
- int64_t headerLength();
const InvertedIndexFileInfo* get_index_file_info() const {
DCHECK(_closed) << debug_string();
return &_file_info;
@@ -77,11 +82,7 @@ public:
return _total_file_size;
}
const io::FileSystemSPtr& get_fs() const { return _fs; }
- void sort_files(std::vector<FileInfo>& file_infos);
- void copyFile(const char* fileName, lucene::store::Directory* dir,
- lucene::store::IndexOutput* output, uint8_t* buffer, int64_t
bufferLength);
InvertedIndexStorageFormatPB get_storage_format() const { return
_storage_format; }
-
void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts =
opts; }
std::string debug_string() const {
@@ -99,12 +100,61 @@ public:
}
private:
+ // Helper functions shared between write_v1 and write_v2
+ std::vector<FileInfo> prepare_sorted_files(lucene::store::Directory*
directory);
+ void sort_files(std::vector<FileInfo>& file_infos);
+ void copyFile(const char* fileName, lucene::store::Directory* dir,
+ lucene::store::IndexOutput* output, uint8_t* buffer, int64_t
bufferLength);
+ void finalize_output_dir(lucene::store::Directory* out_dir);
+ void add_index_info(int64_t index_id, const std::string& index_suffix,
+ int64_t compound_file_size);
+ int64_t headerLength();
+ // Helper functions specific to write_v1
+ std::pair<int64_t, int32_t> calculate_header_length(const
std::vector<FileInfo>& sorted_files,
+
lucene::store::Directory* directory);
+ std::pair<lucene::store::Directory*,
std::unique_ptr<lucene::store::IndexOutput>>
+ create_output_stream_v1(int64_t index_id, const std::string& index_suffix);
+ virtual void write_header_and_data_v1(lucene::store::IndexOutput* output,
+ const std::vector<FileInfo>&
sorted_files,
+ lucene::store::Directory* directory,
+ int64_t header_length, int32_t
header_file_count);
+ // Helper functions specific to write_v2
+ std::pair<lucene::store::Directory*,
std::unique_ptr<lucene::store::IndexOutput>>
+ create_output_stream_v2();
+ void write_version_and_indices_count(lucene::store::IndexOutput* output);
+ struct FileMetadata {
+ int64_t index_id;
+ std::string index_suffix;
+ std::string filename;
+ int64_t offset;
+ int64_t length;
+ lucene::store::Directory* directory;
+
+ FileMetadata(int64_t id, const std::string& suffix, const std::string&
file, int64_t off,
+ int64_t len, lucene::store::Directory* dir)
+ : index_id(id),
+ index_suffix(suffix),
+ filename(file),
+ offset(off),
+ length(len),
+ directory(dir) {}
+ };
+ std::vector<FileMetadata> prepare_file_metadata_v2(int64_t&
current_offset);
+ virtual void write_index_headers_and_metadata(lucene::store::IndexOutput*
output,
+ const
std::vector<FileMetadata>& file_metadata);
+ void copy_files_data_v2(lucene::store::IndexOutput* output,
+ const std::vector<FileMetadata>& file_metadata);
+ Status _insert_directory_into_map(int64_t index_id, const std::string&
index_suffix,
+ std::shared_ptr<DorisFSDirectory> dir);
+ // Member variables...
InvertedIndexDirectoryMap _indices_dirs;
const io::FileSystemSPtr _fs;
std::string _index_path_prefix;
std::string _rowset_id;
int64_t _seg_id;
InvertedIndexStorageFormatPB _storage_format;
+ std::string _tmp_dir;
+ const std::shared_ptr<io::LocalFileSystem>& _local_fs;
// write to disk or stream
io::FileWriterPtr _idx_v2_writer = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index 29fe4609e59..a4f3ca55dd1 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -197,7 +197,7 @@ public:
bool create_index = true;
bool close_dir_on_shutdown = true;
auto index_writer = std::make_unique<lucene::index::IndexWriter>(
- _dir, _analyzer.get(), create_index, close_dir_on_shutdown);
+ _dir.get(), _analyzer.get(), create_index,
close_dir_on_shutdown);
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setRAMBufferSizeMB_error",
{ index_writer->setRAMBufferSizeMB(-100); })
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setMaxBufferedDocs_error",
@@ -708,7 +708,7 @@ private:
std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer = nullptr;
InvertedIndexCtxSPtr _inverted_index_ctx = nullptr;
- DorisFSDirectory* _dir = nullptr;
+ std::shared_ptr<DorisFSDirectory> _dir = nullptr;
const KeyCoder* _value_key_coder;
const TabletIndex* _index_meta;
InvertedIndexParserType _parser_type;
diff --git a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
new file mode 100644
index 00000000000..dd3b4195c14
--- /dev/null
+++ b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
@@ -0,0 +1,515 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/inverted_index_file_writer.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
+#include "olap/storage_engine.h"
+
+namespace doris {
+namespace segment_v2 {
+
+using namespace doris::vectorized;
+
+class InvertedIndexFileWriterTest : public ::testing::Test {
+protected:
+ class MockDorisFSDirectoryFileLength : public DorisFSDirectory {
+ public:
+ //MOCK_METHOD(lucene::store::IndexOutput*, createOutput, (const char*
name), (override));
+ MOCK_METHOD(int64_t, fileLength, (const char* name), (const,
override));
+ //MOCK_METHOD(void, close, (), (override));
+ //MOCK_METHOD(const char*, getObjectName, (), (const, override));
+ };
+ class MockDorisFSDirectoryOpenInput : public DorisFSDirectory {
+ public:
+ MOCK_METHOD(bool, openInput,
+ (const char* name, lucene::store::IndexInput*& ret,
CLuceneError& err,
+ int32_t bufferSize),
+ (override));
+ };
+ void SetUp() override {
+ char buffer[MAX_PATH_LEN];
+ ASSERT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+ _current_dir = std::string(buffer);
+ _absolute_dir = _current_dir + "/" + std::string(dest_dir);
+
ASSERT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
+
ASSERT_TRUE(io::global_local_filesystem()->create_directory(_absolute_dir).ok());
+ // tmp dir
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok());
+ std::vector<StorePath> paths;
+ paths.emplace_back(std::string(tmp_dir), -1);
+ auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths);
+ EXPECT_TRUE(tmp_file_dirs->init().ok());
+ ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs));
+
+ // use memory limit
+ int64_t inverted_index_cache_limit = 0;
+ _inverted_index_searcher_cache =
std::unique_ptr<segment_v2::InvertedIndexSearcherCache>(
+
InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit,
+ 256));
+
+ ExecEnv::GetInstance()->set_inverted_index_searcher_cache(
+ _inverted_index_searcher_cache.get());
+ doris::EngineOptions options;
+ auto engine = std::make_unique<StorageEngine>(options);
+ _engine_ref = engine.get();
+ _data_dir = std::make_unique<DataDir>(*_engine_ref, _absolute_dir);
+ ASSERT_TRUE(_data_dir->update_capacity().ok());
+ ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
+
+ _fs = io::global_local_filesystem();
+ _index_path_prefix = _absolute_dir + "/index_test";
+ _rowset_id = "test_rowset";
+ _seg_id = 1;
+ }
+
+ void TearDown() override {
+
ASSERT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
+ }
+
+ std::unique_ptr<TabletIndex> create_mock_tablet_index(int64_t index_id,
+ const std::string&
index_suffix) {
+ TabletIndexPB index_pb;
+ index_pb.set_index_id(index_id);
+ index_pb.set_index_suffix_name(index_suffix);
+ index_pb.set_index_type(IndexType::INVERTED);
+ auto index = std::make_unique<TabletIndex>();
+ index->init_from_pb(index_pb);
+ return index;
+ }
+
+ std::string _current_dir;
+ std::string _absolute_dir;
+ io::FileSystemSPtr _fs;
+ std::string _index_path_prefix;
+ std::string _rowset_id;
+ int64_t _seg_id;
+ StorageEngine* _engine_ref = nullptr;
+ std::unique_ptr<DataDir> _data_dir = nullptr;
+ std::unique_ptr<InvertedIndexSearcherCache> _inverted_index_searcher_cache;
+
+ constexpr static uint32_t MAX_PATH_LEN = 1024;
+ constexpr static std::string_view dest_dir =
"./ut_dir/inverted_index_file_writer_test";
+ constexpr static std::string_view tmp_dir = "./ut_dir/tmp";
+};
+
+TEST_F(InvertedIndexFileWriterTest, InitializeTest) {
+ InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id,
_seg_id,
+ InvertedIndexStorageFormatPB::V2);
+
+ InvertedIndexDirectoryMap indices_dirs;
+ indices_dirs.emplace(std::make_pair(1, "suffix1"),
std::make_unique<DorisFSDirectory>());
+ indices_dirs.emplace(std::make_pair(2, "suffix2"),
std::make_unique<DorisFSDirectory>());
+
+ Status status = writer.initialize(indices_dirs);
+ ASSERT_TRUE(status.ok());
+
+ ASSERT_EQ(writer.get_storage_format(), InvertedIndexStorageFormatPB::V2);
+}
+
+TEST_F(InvertedIndexFileWriterTest, OpenTest) {
+ InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id,
_seg_id,
+ InvertedIndexStorageFormatPB::V2);
+
+ int64_t index_id = 1;
+ std::string index_suffix = "suffix1";
+ auto index_meta = create_mock_tablet_index(index_id, index_suffix);
+ ASSERT_NE(index_meta, nullptr);
+
+ auto open_result = writer.open(index_meta.get());
+ ASSERT_TRUE(open_result.has_value());
+ auto dir = open_result.value();
+ ASSERT_NE(dir, nullptr);
+
+ auto key = std::make_pair(index_id, index_suffix);
+ ASSERT_TRUE(writer._indices_dirs.find(key) != writer._indices_dirs.end());
+ ASSERT_TRUE(writer._indices_dirs.find(key)->second.get() == dir.get());
+}
+
+TEST_F(InvertedIndexFileWriterTest, DeleteIndexTest) {
+ InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id,
_seg_id,
+ InvertedIndexStorageFormatPB::V2);
+
+ InvertedIndexDirectoryMap indices_dirs;
+ int64_t index_id = 1;
+ std::string index_suffix = "suffix1";
+ auto st = writer._insert_directory_into_map(index_id, index_suffix,
+
std::make_shared<DorisFSDirectory>());
+ if (!st.ok()) {
+ std::cerr << "_insert_directory_into_map error in DeleteIndexTest: "
<< st.msg()
+ << std::endl;
+ ASSERT_TRUE(false);
+ return;
+ }
+ auto key = std::make_pair(index_id, index_suffix);
+ ASSERT_TRUE(writer._indices_dirs.find(key) != writer._indices_dirs.end());
+
+ auto index_meta = create_mock_tablet_index(index_id, index_suffix);
+ ASSERT_NE(index_meta, nullptr);
+ Status del_status = writer.delete_index(index_meta.get());
+ ASSERT_TRUE(del_status.ok());
+ ASSERT_TRUE(writer._indices_dirs.find(key) == writer._indices_dirs.end());
+
+ Status del_nonexist_status = writer.delete_index(index_meta.get());
+ ASSERT_TRUE(del_nonexist_status.ok());
+}
+
+TEST_F(InvertedIndexFileWriterTest, WriteV1Test) {
+ InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id,
_seg_id,
+ InvertedIndexStorageFormatPB::V1);
+
+ int64_t index_id = 1;
+ std::string index_suffix = "suffix1";
+ auto index_meta = create_mock_tablet_index(index_id, index_suffix);
+ ASSERT_NE(index_meta, nullptr);
+
+ auto open_result = writer.open(index_meta.get());
+ ASSERT_TRUE(open_result.has_value());
+ auto dir = open_result.value();
+ auto out_file =
std::unique_ptr<lucene::store::IndexOutput>(dir->createOutput("write_v1_test"));
+ out_file->writeString("test1");
+ out_file->close();
+ dir->close();
+
+ Status close_status = writer.close();
+ if (!close_status.ok()) std::cout << "close error:" << close_status.msg()
<< std::endl;
+ ASSERT_TRUE(close_status.ok());
+
+ const InvertedIndexFileInfo* file_info = writer.get_index_file_info();
+ ASSERT_NE(file_info, nullptr);
+ auto index_info = file_info->index_info(0);
+ ASSERT_GT(index_info.index_file_size(), 0);
+
+ int64_t total_size = writer.get_index_file_total_size();
+ ASSERT_GT(total_size, 0);
+ ASSERT_EQ(total_size, index_info.index_file_size());
+ std::cout << "total_size:" << total_size << std::endl;
+}
+
+TEST_F(InvertedIndexFileWriterTest, WriteV2Test) {
+ io::FileWriterPtr file_writer;
+ std::string index_path =
InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
+ io::FileWriterOptions opts;
+ Status st = _fs->create_file(index_path, &file_writer, &opts);
+ ASSERT_TRUE(st.ok());
+ InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id,
_seg_id,
+ InvertedIndexStorageFormatPB::V2,
std::move(file_writer));
+
+ int64_t index_id_1 = 1;
+ std::string index_suffix_1 = "suffix1";
+ auto index_meta_1 = create_mock_tablet_index(index_id_1, index_suffix_1);
+ ASSERT_NE(index_meta_1, nullptr);
+ auto open_result_1 = writer.open(index_meta_1.get());
+ ASSERT_TRUE(open_result_1.has_value());
+ auto dir_1 = open_result_1.value();
+ auto out_file_1 = std::unique_ptr<lucene::store::IndexOutput>(
+ dir_1->createOutput("write_v2_test_index_1"));
+ out_file_1->writeString("test1");
+ out_file_1->close();
+ dir_1->close();
+ int64_t index_id_2 = 2;
+ std::string index_suffix_2 = "suffix2";
+ auto index_meta_2 = create_mock_tablet_index(index_id_2, index_suffix_2);
+ ASSERT_NE(index_meta_2, nullptr);
+ auto open_result_2 = writer.open(index_meta_2.get());
+ ASSERT_TRUE(open_result_2.has_value());
+ auto dir_2 = open_result_2.value();
+ auto out_file_2 = std::unique_ptr<lucene::store::IndexOutput>(
+ dir_2->createOutput("write_v2_test_index_2"));
+ out_file_2->writeString("test2");
+ out_file_2->close();
+ dir_2->close();
+ Status close_status = writer.close();
+ ASSERT_TRUE(close_status.ok());
+
+ const InvertedIndexFileInfo* file_info = writer.get_index_file_info();
+ ASSERT_NE(file_info, nullptr);
+ ASSERT_GT(file_info->index_size(), 0);
+
+ int64_t total_size = writer.get_index_file_total_size();
+ ASSERT_GT(total_size, 0);
+ ASSERT_EQ(total_size, file_info->index_size());
+ std::cout << "total_size:" << total_size << std::endl;
+}
+
+TEST_F(InvertedIndexFileWriterTest, HeaderLengthTest) {
+ InvertedIndexDirectoryMap indices_dirs;
+ auto mock_dir1 = std::make_shared<DorisFSDirectory>();
+ auto mock_dir2 = std::make_shared<DorisFSDirectory>();
+ std::string local_fs_index_path_1 =
InvertedIndexDescriptor::get_temporary_index_path(
+
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(),
_rowset_id,
+ _seg_id, 1, "suffix1");
+ std::string local_fs_index_path_2 =
InvertedIndexDescriptor::get_temporary_index_path(
+
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(),
_rowset_id,
+ _seg_id, 2, "suffix2");
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path_1).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path_1).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path_2).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path_2).ok());
+ mock_dir1->init(_fs, local_fs_index_path_1.c_str());
+ mock_dir2->init(_fs, local_fs_index_path_2.c_str());
+ std::vector<std::string> files1 = {"file1.dat", "file2.dat"};
+ std::vector<std::string> files2 = {"file3.dat"};
+ for (auto& file : files1) {
+ auto out_file_1 =
+
std::unique_ptr<lucene::store::IndexOutput>(mock_dir1->createOutput(file.c_str()));
+ out_file_1->writeString("test1");
+ out_file_1->close();
+ }
+ for (auto& file : files2) {
+ auto out_file_2 =
+
std::unique_ptr<lucene::store::IndexOutput>(mock_dir2->createOutput(file.c_str()));
+ out_file_2->writeString("test2");
+ out_file_2->close();
+ }
+ auto insertDirectory = [&](InvertedIndexFileWriter& writer, int64_t
index_id,
+ const std::string& suffix,
+ std::shared_ptr<DorisFSDirectory>& mock_dir) {
+ Status st = writer._insert_directory_into_map(index_id, suffix,
mock_dir);
+ if (!st.ok()) {
+ std::cerr << "_insert_directory_into_map error in
HeaderLengthTest: " << st.msg()
+ << std::endl;
+ assert(false);
+ return;
+ }
+ };
+
+ InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id,
_seg_id,
+ InvertedIndexStorageFormatPB::V2);
+ insertDirectory(writer, 1, "suffix1", mock_dir1);
+ insertDirectory(writer, 2, "suffix2", mock_dir2);
+
+ int64_t header_length = writer.headerLength();
+
+ // sizeof(int32_t) * 2
+ // + (sizeof(int64_t) + sizeof(int32_t) + suffix.length() +
sizeof(int32_t)) * num_indices
+ // + (sizeof(int32_t) + filename.length() + sizeof(int64_t) +
sizeof(int64_t)) * num_files
+ int64_t expected_header_length = 0;
+ expected_header_length += sizeof(int32_t) * 2; // version and num_indices
+
+ // Index 1
+ expected_header_length += sizeof(int64_t); // index_id
+ expected_header_length += sizeof(int32_t); // suffix size
+ expected_header_length += 7; // "suffix1"
+ expected_header_length += sizeof(int32_t); // file_count
+ expected_header_length += sizeof(int32_t) + 9 + sizeof(int64_t) +
sizeof(int64_t); // file1.dat
+ expected_header_length += sizeof(int32_t) + 9 + sizeof(int64_t) +
sizeof(int64_t); // file2.dat
+
+ // Index 2
+ expected_header_length += sizeof(int64_t); // index_id
+ expected_header_length += sizeof(int32_t); // suffix size
+ expected_header_length += 7; // "suffix2"
+ expected_header_length += sizeof(int32_t); // file_count
+ expected_header_length += sizeof(int32_t) + 9 + sizeof(int64_t) +
sizeof(int64_t); // file3.dat
+
+ ASSERT_EQ(header_length, expected_header_length);
+}
+
+TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) {
+ auto mock_dir = std::make_shared<MockDorisFSDirectoryFileLength>();
+ std::string local_fs_index_path =
InvertedIndexDescriptor::get_temporary_index_path(
+
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(),
_rowset_id,
+ _seg_id, 1, "suffix1");
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok());
+ mock_dir->init(_fs, local_fs_index_path.c_str());
+ std::vector<std::string> files = {"0.segments", "0.fnm", "0.tii",
"nullbitmap", "write.lock"};
+ for (auto& file : files) {
+ auto out_file_1 =
+
std::unique_ptr<lucene::store::IndexOutput>(mock_dir->createOutput(file.c_str()));
+ out_file_1->writeString("test1");
+ out_file_1->close();
+ }
+
+ EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.segments")))
+ .WillOnce(testing::Return(1000));
+ EXPECT_CALL(*mock_dir,
fileLength(testing::StrEq("0.fnm"))).WillOnce(testing::Return(2000));
+ EXPECT_CALL(*mock_dir,
fileLength(testing::StrEq("0.tii"))).WillOnce(testing::Return(1500));
+ EXPECT_CALL(*mock_dir,
fileLength(testing::StrEq("nullbitmap"))).WillOnce(testing::Return(500));
+
+ InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id,
_seg_id,
+ InvertedIndexStorageFormatPB::V2);
+ auto st = writer._insert_directory_into_map(1, "suffix1", mock_dir);
+ if (!st.ok()) {
+ std::cerr << "_insert_directory_into_map error in
PrepareSortedFilesTest: " << st.msg()
+ << std::endl;
+ ASSERT_TRUE(false);
+ return;
+ }
+
+ std::vector<FileInfo> sorted_files =
+ writer.prepare_sorted_files(writer._indices_dirs[std::make_pair(1,
"suffix1")].get());
+
+ // 1. 0.segments (priority 1, size 1000)
+ // 2. 0.fnm (priority 2, size 2000)
+ // 3. 0.tii (priority 3, size 1500)
+ // 4. nullbitmap (priority 4, size 500)
+
+ std::vector<std::string> expected_order = {"0.segments", "0.fnm", "0.tii",
"nullbitmap"};
+ ASSERT_EQ(sorted_files.size(), expected_order.size());
+
+ for (size_t i = 0; i < expected_order.size(); ++i) {
+ EXPECT_EQ(sorted_files[i].filename, expected_order[i]);
+ if (sorted_files[i].filename == "0.segments") {
+ EXPECT_EQ(sorted_files[i].filesize, 1000);
+ } else if (sorted_files[i].filename == "0.fnm") {
+ EXPECT_EQ(sorted_files[i].filesize, 2000);
+ } else if (sorted_files[i].filename == "0.tii") {
+ EXPECT_EQ(sorted_files[i].filesize, 1500);
+ } else if (sorted_files[i].filename == "nullbitmap") {
+ EXPECT_EQ(sorted_files[i].filesize, 500);
+ }
+ }
+}
+/*TEST_F(InvertedIndexFileWriterTest, CopyFileTest_OpenInputFailure) {
+ auto mock_dir = std::make_shared<MockDorisFSDirectoryOpenInput>();
+ std::string local_fs_index_path =
InvertedIndexDescriptor::get_temporary_index_path(
+
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(),
_rowset_id,
+ _seg_id, 1, "suffix1");
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok());
+ mock_dir->init(_fs, local_fs_index_path.c_str());
+ std::vector<std::string> files = {"0.segments", "0.fnm", "0.tii",
"nullbitmap", "write.lock"};
+ for (auto& file : files) {
+ auto out_file_1 =
+
std::unique_ptr<lucene::store::IndexOutput>(mock_dir->createOutput(file.c_str()));
+ out_file_1->writeString("test1");
+ out_file_1->close();
+ }
+ InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id,
_seg_id,
+ InvertedIndexStorageFormatPB::V2);
+ auto st = writer._insert_directory_into_map(1, "suffix1", mock_dir);
+ if (!st.ok()) {
+ std::cerr << "_insert_directory_into_map error in
CopyFileTest_OpenInputFailure: "
+ << st.msg() << std::endl;
+ ASSERT_TRUE(false);
+ return;
+ }
+
+ EXPECT_CALL(*mock_dir,
+ openInput(::testing::StrEq("0.segments"), ::testing::_,
::testing::_, ::testing::_))
+ .WillOnce(::testing::Invoke([&](const char* name,
lucene::store::IndexInput*& ret,
+ CLuceneError& err_ref, int
bufferSize) {
+ err_ref.set(CL_ERR_IO, fmt::format("Could not open file, file
is {}", name).data());
+ return false;
+ }));
+
+ uint8_t buffer[16384];
+ std::string error_message;
+ try {
+ writer.copyFile("0.segments", mock_dir.get(), nullptr, buffer,
sizeof(buffer));
+ } catch (CLuceneError& err) {
+ error_message = err.what();
+ }
+ ASSERT_EQ(error_message, "Could not open file, file is 0.segments");
+}*/
+class InvertedIndexFileWriterMock : public InvertedIndexFileWriter {
+public:
+ InvertedIndexFileWriterMock(const io::FileSystemSPtr& fs, const
std::string& index_path_prefix,
+ const std::string& rowset_id, int32_t
segment_id,
+ InvertedIndexStorageFormatPB storage_format)
+ : InvertedIndexFileWriter(fs, index_path_prefix, rowset_id,
segment_id,
+ storage_format) {}
+
+ MOCK_METHOD(void, write_header_and_data_v1,
+ (lucene::store::IndexOutput * output, const
std::vector<FileInfo>& files,
+ lucene::store::Directory* dir, int64_t header_length, int32_t
file_count),
+ (override));
+};
+TEST_F(InvertedIndexFileWriterTest, WriteV1ExceptionHandlingTest) {
+ InvertedIndexFileWriterMock writer_mock(_fs, _index_path_prefix,
_rowset_id, _seg_id,
+ InvertedIndexStorageFormatPB::V1);
+
+ int64_t index_id = 1;
+ std::string index_suffix = "suffix1";
+ auto index_meta = create_mock_tablet_index(index_id, index_suffix);
+ ASSERT_NE(index_meta, nullptr);
+
+ auto open_result = writer_mock.open(index_meta.get());
+ ASSERT_TRUE(open_result.has_value());
+ auto dir = open_result.value();
+
+ auto out_file =
std::unique_ptr<lucene::store::IndexOutput>(dir->createOutput("test_file"));
+ out_file->writeString("test data");
+ out_file->close();
+ dir->close();
+ EXPECT_CALL(writer_mock, write_header_and_data_v1(::testing::_,
::testing::_, ::testing::_,
+ ::testing::_,
::testing::_))
+ .WillOnce(::testing::Throw(CLuceneError(CL_ERR_IO, "Simulated
exception", false)));
+
+ Status status = writer_mock.write_v1();
+ ASSERT_FALSE(status.ok());
+ ASSERT_EQ(status.code(), ErrorCode::INVERTED_INDEX_CLUCENE_ERROR);
+}
+class InvertedIndexFileWriterMockV2 : public InvertedIndexFileWriter {
+public:
+ InvertedIndexFileWriterMockV2(const io::FileSystemSPtr& fs,
+ const std::string& index_path_prefix,
+ const std::string& rowset_id, int32_t
segment_id,
+ InvertedIndexStorageFormatPB storage_format,
+ io::FileWriterPtr file_writer)
+ : InvertedIndexFileWriter(fs, index_path_prefix, rowset_id,
segment_id, storage_format,
+ std::move(file_writer)) {}
+
+ MOCK_METHOD(void, write_index_headers_and_metadata,
+ (lucene::store::IndexOutput * compound_file_output,
+ const std::vector<FileMetadata>& file_metadata),
+ (override));
+};
+
+TEST_F(InvertedIndexFileWriterTest, WriteV2ExceptionHandlingTest) {
+ io::FileWriterPtr file_writer;
+ std::string index_path =
InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
+ io::FileWriterOptions opts;
+ Status st = _fs->create_file(index_path, &file_writer, &opts);
+ ASSERT_TRUE(st.ok());
+ InvertedIndexFileWriterMockV2 writer_mock(_fs, _index_path_prefix,
_rowset_id, _seg_id,
+ InvertedIndexStorageFormatPB::V2,
+ std::move(file_writer));
+
+ int64_t index_id = 1;
+ std::string index_suffix = "suffix1";
+ auto index_meta = create_mock_tablet_index(index_id, index_suffix);
+ ASSERT_NE(index_meta, nullptr);
+
+ auto open_result = writer_mock.open(index_meta.get());
+ ASSERT_TRUE(open_result.has_value());
+ auto dir = open_result.value();
+
+ auto out_file =
std::unique_ptr<lucene::store::IndexOutput>(dir->createOutput("test_file"));
+ out_file->writeString("test data");
+ out_file->close();
+ dir->close();
+
+ EXPECT_CALL(writer_mock, write_index_headers_and_metadata(::testing::_,
::testing::_))
+ .WillOnce(::testing::Throw(CLuceneError(CL_ERR_IO, "Simulated
exception", false)));
+
+ Status status = writer_mock.write_v2();
+ ASSERT_FALSE(status.ok());
+ ASSERT_EQ(status.code(), ErrorCode::INVERTED_INDEX_CLUCENE_ERROR);
+}
+
+} // namespace segment_v2
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]