This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch vector-index-dev in repository https://gitbox.apache.org/repos/asf/doris.git
commit d4a9d1f99b37afccb2829934021757c587a45ae9 Author: hezhiqiang <hezhiqi...@selectdb.com> AuthorDate: Thu Jun 5 16:56:09 2025 +0800 Refactor: remove useless tools --- be/src/index-tools/ann_tool.cpp | 357 --------------------- be/src/olap/compaction.cpp | 1 + be/src/olap/rowset/segment_v2/ann_index_writer.cpp | 8 +- be/src/olap/rowset/segment_v2/ann_index_writer.h | 1 - be/src/olap/rowset/segment_v2/column_writer.cpp | 17 +- be/src/olap/rowset/segment_v2/column_writer.h | 4 - .../olap/rowset/segment_v2/index_file_writer.cpp | 1 + be/src/olap/rowset/segment_v2/index_writer.cpp | 1 - be/src/olap/rowset/segment_v2/index_writer.h | 46 +-- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 74 ++--- be/src/olap/rowset/segment_v2/segment_iterator.h | 2 + .../segment_v2/{index_writer.h => tmp_file_dirs.h} | 63 +--- be/src/olap/tablet_manager.cpp | 2 - be/src/olap/tablet_meta.cpp | 1 + be/src/pipeline/exec/scan_operator.cpp | 4 +- be/src/runtime/exec_env_init.cpp | 1 + be/src/vec/core/block.cpp | 5 - be/src/vec/exec/scan/scanner_scheduler.cpp | 71 ++-- be/src/vec/exec/scan/scanner_scheduler.h | 4 +- be/src/vec/olap/vcollect_iterator.cpp | 29 -- be/src/vector/CMakeLists.txt | 3 +- be/src/vector/diskann_vector_index.cpp | 212 ------------ be/src/vector/diskann_vector_index.h | 284 ---------------- be/src/vector/faiss_vector_index.h | 9 +- be/src/vector/stream_wrapper.h | 146 --------- be/src/vector/vector_index.h | 65 ++-- .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 2 +- .../apache/doris/alter/SchemaChangeHandler.java | 5 + .../doris/analysis/AnnIndexPropertiesChecker.java | 79 +++++ .../java/org/apache/doris/analysis/IndexDef.java | 26 +- .../doris/nereids/parser/LogicalPlanBuilder.java | 2 + .../trees/plans/commands/info/IndexDefinition.java | 8 + gensrc/thrift/Constant.thrift | 33 ++ .../ddl_p0/ann_index/create_ann_index_test.groovy | 205 ++++++++++++ .../create_tbl_with_ann_index_test.groovy | 279 ++++++++++++++++ 35 files changed, 783 insertions(+), 1267 deletions(-) diff --git a/be/src/index-tools/ann_tool.cpp b/be/src/index-tools/ann_tool.cpp deleted file mode 100644 index 0df4835f798..00000000000 --- a/be/src/index-tools/ann_tool.cpp +++ /dev/null @@ -1,357 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <CLucene.h> -#include <CLucene/config/repl_wchar.h> -#include <gen_cpp/PaloInternalService_types.h> -#include <gen_cpp/olap_file.pb.h> -#include <gflags/gflags.h> - -#include <filesystem> -#include <fstream> -#include <iostream> -#include <memory> -#include <nlohmann/json.hpp> -#include <roaring/roaring.hh> -#include <sstream> -#include <string> -#include <vector> - -#include "io/fs/file_reader.h" -#ifdef __clang__ -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wshadow-field" -#endif -#include "CLucene/analysis/standard95/StandardAnalyzer.h" -#ifdef __clang__ -#pragma clang diagnostic pop -#endif -#include "common/signal_handler.h" -#include "io/fs/file_system.h" -#include "io/fs/file_writer.h" -#include "io/fs/local_file_system.h" -#include "olap/options.h" -#include "olap/rowset/segment_v2/ann_index_writer.h" -#include "olap/rowset/segment_v2/index_file_reader.h" -#include "olap/rowset/segment_v2/index_file_writer.h" -#include "olap/rowset/segment_v2/inverted_index/query/conjunction_query.h" -#include "olap/rowset/segment_v2/inverted_index_compound_reader.h" -#include "olap/rowset/segment_v2/inverted_index_desc.h" -#include "olap/rowset/segment_v2/inverted_index_fs_directory.h" -#include "olap/tablet_schema.h" -#include "util/disk_info.h" -#include "util/mem_info.h" -#include "vector/diskann_vector_index.h" -#include "vector/vector_index.h" - -using doris::segment_v2::DorisCompoundReader; -using doris::segment_v2::DorisFSDirectoryFactory; -using doris::segment_v2::IndexFileWriter; -using doris::segment_v2::InvertedIndexDescriptor; -using doris::segment_v2::IndexFileReader; -using doris::io::FileInfo; -using doris::TabletIndex; -using namespace doris::segment_v2; -using namespace lucene::analysis; -using namespace lucene::index; -using namespace lucene::util; -using namespace lucene::search; -using doris::io::FileSystem; - -#include "io/fs/path.h" - -const std::string& file_dir = "/home/users/clz/run/test_diskann/123456_0.idx"; -std::filesystem::path index_data_path(file_dir); - -int index_id = 100; -std::string rowset_id = "rowset_id"; -int seg_id = 0; - -std::shared_ptr<FileSystem> get_local_file_filesystem() { - return doris::io::global_local_filesystem(); -} - -void test_add() { - auto fs = get_local_file_filesystem(); - int index_id = 100; - - doris::io::FileWriterPtr file_writer; - - auto st = doris::io::global_local_filesystem()->create_file(file_dir, &file_writer); - if (!st.ok()) { - std::cerr << "failed create_file" << file_dir << std::endl; - } - std::unique_ptr<IndexFileWriter> index_file_writer = std::make_unique<IndexFileWriter>( - fs, index_data_path.parent_path(), rowset_id, seg_id, - doris::InvertedIndexStorageFormatPB::V2, std::move(file_writer)); - - doris::TabletIndexPB index_pb; - - index_pb.set_index_id(index_id); - TabletIndex index_meta; - index_meta.init_from_pb(index_pb); - index_meta._index_type = doris::IndexType::ANN; - index_meta._properties["index_type"] = "diskann"; - index_meta._properties["metric_type"] = "l2"; - index_meta._properties["dim"] = "7"; - index_meta._properties["max_degree"] = "32"; - index_meta._properties["search_list"] = "100"; - - std::string field_name = "word_embeding"; - std::unique_ptr<AnnIndexColumnWriter> ann_writer = std::make_unique<AnnIndexColumnWriter>( - field_name, index_file_writer.get(), &index_meta, true); - st = ann_writer->init(); - if (!st.ok()) { - std::cout << "failed to ann_writer->init()" << std::endl; - return; - } - - //writer - int field_size = 4; - float value[14] = {1, 2, 3, 4, 5, 6, 7, 7, 6, 5, 4, 3, 2, 1}; - int64_t offsets[3] = {0, 7, 14}; - st = ann_writer->add_array_values(field_size, (const void*)value, nullptr, - (const uint8_t*)offsets, 2); - if (!st.ok()) { - std::cout << "failed to ann_writer->add_array_values" << std::endl; - return; - } - - //finish - st = ann_writer->finish(); - if (!st.ok()) { - std::cout << "failed to ann_writer->finish" << std::endl; - return; - } - - //save to disk - st = index_file_writer->close(); - if (!st.ok()) { - std::cout << "failed to indexwriter->close" << std::endl; - return; - } -} - -void init_env() { - doris::CpuInfo::init(); - doris::DiskInfo::init(); - doris::MemInfo::init(); - - string custom_conffile = "/home/users/clz/run/be_1.conf"; - if (!doris::config::init(custom_conffile.c_str(), true, false, true)) { - fprintf(stderr, "error read custom config file. \n"); - return; - } - - std::vector<doris::StorePath> paths; - std::string storage = "/home/users/clz/run/storage"; - std::string spill = "/home/users/clz/run/splill"; - std::string broken_storage_path = "/home/users/clz/run/broken"; - - auto olap_res = doris::parse_conf_store_paths(storage, &paths); - if (!olap_res) { - LOG(ERROR) << "parse config storage path failed, path=" << storage; - exit(-1); - } - - std::vector<doris::StorePath> spill_paths; - olap_res = doris::parse_conf_store_paths(spill, &spill_paths); - if (!olap_res) { - LOG(ERROR) << "parse config spill storage path failed, path=" << spill; - exit(-1); - } - std::set<std::string> broken_paths; - doris::parse_conf_broken_store_paths(broken_storage_path, &broken_paths); - - // auto it = paths.begin(); - // for (; it != paths.end();) { - // if (broken_paths.count(it->path) > 0) { - // if (doris::config::ignore_broken_disk) { - // LOG(WARNING) << "ignore broken disk, path = " << it->path; - // it = paths.erase(it); - // } else { - // LOG(ERROR) << "a broken disk is found " << it->path; - // exit(-1); - // } - // } else if (!doris::check_datapath_rw(it->path)) { - // if (doris::config::ignore_broken_disk) { - // LOG(WARNING) << "read write test file failed, path=" << it->path; - // it = paths.erase(it); - // } else { - // LOG(ERROR) << "read write test file failed, path=" << it->path; - // // if only one disk and the disk is full, also need exit because rocksdb will open failed - // exit(-1); - // } - // } else { - // ++it; - // } - // } - - // if (paths.empty()) { - // LOG(ERROR) << "All disks are broken, exit."; - // exit(-1); - // } - - // it = spill_paths.begin(); - // for (; it != spill_paths.end();) { - // if (!doris::check_datapath_rw(it->path)) { - // if (doris::config::ignore_broken_disk) { - // LOG(WARNING) << "read write test file failed, path=" << it->path; - // it = spill_paths.erase(it); - // } else { - // LOG(ERROR) << "read write test file failed, path=" << it->path; - // exit(-1); - // } - // } else { - // ++it; - // } - // } - // if (spill_paths.empty()) { - // LOG(ERROR) << "All spill disks are broken, exit."; - // exit(-1); - // } - - // // initialize libcurl here to avoid concurrent initialization - // auto curl_ret = curl_global_init(CURL_GLOBAL_ALL); - // if (curl_ret != 0) { - // LOG(ERROR) << "fail to initialize libcurl, curl_ret=" << curl_ret; - // exit(-1); - // } - // // add logger for thrift internal - // apache::thrift::GlobalOutput.setOutputFunction(doris::thrift_output); - - // Status status = Status::OK(); - // if (doris::config::enable_java_support) { - // // Init jni - // status = doris::JniUtil::Init(); - // if (!status.ok()) { - // LOG(WARNING) << "Failed to initialize JNI: " << status; - // exit(1); - // } else { - // LOG(INFO) << "Doris backend JNI is initialized."; - // } - // } - - // // Doris own signal handler must be register after jvm is init. - // // Or our own sig-handler for SIGINT & SIGTERM will not be chained ... - // // https://www.oracle.com/java/technologies/javase/signals.html - // doris::init_signals(); - // // ATTN: MUST init before `ExecEnv`, `StorageEngine` and other daemon services - // // - // // Daemon ───┬──► StorageEngine ──► ExecEnv ──► Disk/Mem/CpuInfo - // // │ - // // │ - // // BackendService ─┘ - // doris::CpuInfo::init(); - // doris::DiskInfo::init(); - // doris::MemInfo::init(); - - // LOG(INFO) << doris::CpuInfo::debug_string(); - // LOG(INFO) << doris::DiskInfo::debug_string(); - // LOG(INFO) << doris::MemInfo::debug_string(); - - // // PHDR speed up exception handling, but exceptions from dynamically loaded libraries (dlopen) - // // will work only after additional call of this function. - // // rewrites dl_iterate_phdr will cause Jemalloc to fail to run after enable profile. see # - // // updatePHDRCache(); - // if (!doris::BackendOptions::init()) { - // exit(-1); - // } - - // doris::ThreadLocalHandle::create_thread_local_if_not_exits(); - - // init exec env - //auto* exec_env(doris::ExecEnv::GetInstance()); - doris::Status status = - doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths, spill_paths, broken_paths); - if (!status.ok()) { - std::cout << "init fail" << std::endl; - } -} - -void test_search() { - auto fs = get_local_file_filesystem(); - auto index_file_reader = - std::make_unique<IndexFileReader>(fs, "/home/users/clz/run/test_diskann/123456_0", - doris::InvertedIndexStorageFormatPB::V2); - auto st = index_file_reader->init(4096); - if (!st.ok()) { - std::cout << "failed to index_file_reader->init" << st << std::endl; - return; - } - doris::TabletIndexPB index_pb; - index_pb.set_index_id(index_id); - TabletIndex index_meta; - index_meta.init_from_pb(index_pb); - - auto ret = index_file_reader->open(&index_meta); - if (!ret.has_value()) { - std::cerr << "IndexFileReader open error:" << ret.error() << std::endl; - return; - } - using T = std::decay_t<decltype(ret)>; - std::shared_ptr<DorisCompoundReader> dir = std::forward<T>(ret).value(); - - std::shared_ptr<DiskannVectorIndex> vindex = std::make_shared<DiskannVectorIndex>(dir); - st = vindex->load(VectorIndex::Metric::L2); - if (!st.ok()) { - std::cout << "failed to vindex->load" << std::endl; - return; - } - float query_vec[7] = {1, 2, 3, 4, 5, 6, 7}; - SearchResult result; - std::shared_ptr<DiskannSearchParameter> searchParams = - std::make_shared<DiskannSearchParameter>(); - searchParams->with_search_list(100); - searchParams->with_beam_width(2); - - //设置过滤条件 - std::shared_ptr<IDFilter> filter = nullptr; - std::shared_ptr<roaring::Roaring> bitmap = std::make_shared<roaring::Roaring>(); - // bitmap->add(1); - // filter.reset(new IDFilter(bitmap)); - // searchParams->set_filter(filter); - st = vindex->search(query_vec, 5, &result, searchParams.get()); - if (!st.ok()) { - std::cout << "failed to vindex->search" << std::endl; - return; - } - if (result.has_rows()) { - for (int i = 0; i < result.row_count(); i++) { - std::cout << "idx:" << result.get_id(i) << ", distance:" << result.get_distance(i) - << std::endl; - } - } -} - -int main(int argc, char** argv) { - if (argc < 2) { - std::cerr << "Usage: " << argv[0] << " <test_add|test_search>" << std::endl; - return 1; - } - doris::signal::InstallFailureSignalHandler(); - init_env(); - std::string command = argv[1]; - if (command == "add") { - test_add(); - } else if (command == "search") { - test_search(); - } else { - std::cout << "unkonw command" << std::endl; - } - return 0; -} diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 88c352b4283..9f087273188 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -65,6 +65,7 @@ #include "olap/rowset/segment_v2/inverted_index_compaction.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/rowset/segment_v2/inverted_index_fs_directory.h" +#include "olap/rowset/segment_v2/tmp_file_dirs.h" #include "olap/storage_engine.h" #include "olap/storage_policy.h" #include "olap/tablet.h" diff --git a/be/src/olap/rowset/segment_v2/ann_index_writer.cpp b/be/src/olap/rowset/segment_v2/ann_index_writer.cpp index ffc3b9bb1fd..1995b94b395 100644 --- a/be/src/olap/rowset/segment_v2/ann_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/ann_index_writer.cpp @@ -110,17 +110,15 @@ Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* val Status AnnIndexColumnWriter::add_array_values(size_t field_size, const CollectionValue* values, size_t count) { - return Status::OK(); + return Status::InternalError("Ann index should not be used on nullable column"); } Status AnnIndexColumnWriter::add_nulls(uint32_t count) { - // 实现逻辑 - return Status::OK(); + return Status::InternalError("Ann index should not be used on nullable column"); } Status AnnIndexColumnWriter::add_array_nulls(const uint8_t* null_map, size_t row_id) { - // 实现逻辑 - return Status::OK(); + return Status::InternalError("Ann index should not be used on nullable column"); } int64_t AnnIndexColumnWriter::size() const { diff --git a/be/src/olap/rowset/segment_v2/ann_index_writer.h b/be/src/olap/rowset/segment_v2/ann_index_writer.h index d674fb12648..081b737e23a 100644 --- a/be/src/olap/rowset/segment_v2/ann_index_writer.h +++ b/be/src/olap/rowset/segment_v2/ann_index_writer.h @@ -34,7 +34,6 @@ #include "olap/rowset/segment_v2/inverted_index_fs_directory.h" #include "olap/tablet_schema.h" #include "runtime/collection_value.h" -#include "vector/diskann_vector_index.h" #include "vector/vector_index.h" namespace doris::segment_v2 { diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index ed6f407b1ae..3d7d4244bcc 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -20,6 +20,7 @@ #include <gen_cpp/segment_v2.pb.h> #include <algorithm> +#include <cstdint> #include <filesystem> #include <memory> @@ -534,6 +535,14 @@ Status ScalarColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { return Status::OK(); } +Status ScalarColumnWriter::append_data_in_current_page(const uint8_t** data, size_t* num_written) { + RETURN_IF_CATCH_EXCEPTION( + { return _internal_append_data_in_current_page(*data, num_written); }); + + *data += get_field()->size() * (*num_written); + return Status::OK(); +} + Status ScalarColumnWriter::_internal_append_data_in_current_page(const uint8_t* data, size_t* num_written) { RETURN_IF_ERROR(_page_builder->add(data, num_written)); @@ -561,12 +570,6 @@ Status ScalarColumnWriter::_internal_append_data_in_current_page(const uint8_t* return Status::OK(); } -Status ScalarColumnWriter::append_data_in_current_page(const uint8_t** data, size_t* num_written) { - RETURN_IF_ERROR(append_data_in_current_page(*data, num_written)); - *data += get_field()->size() * (*num_written); - return Status::OK(); -} - uint64_t ScalarColumnWriter::estimate_buffer_size() { uint64_t size = _data_size; size += _page_builder->size(); @@ -930,7 +933,7 @@ Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { // [size, offset_ptr, item_data_ptr, item_nullmap_ptr] auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr); // total number length - size_t element_cnt = size_t((unsigned long)(*data_ptr)); + size_t element_cnt = (*data_ptr); auto offset_data = *(data_ptr + 1); const uint8_t* offsets_ptr = (const uint8_t*)offset_data; auto data = *(data_ptr + 2); diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index 7c251d9ce6f..d88c9bfcb3c 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -222,10 +222,6 @@ public: // used for append not null data. When page is full, will append data not reach num_rows. Status append_data_in_current_page(const uint8_t** ptr, size_t* num_written); - Status append_data_in_current_page(const uint8_t* ptr, size_t* num_written) { - RETURN_IF_CATCH_EXCEPTION( - { return _internal_append_data_in_current_page(ptr, num_written); }); - } friend class ArrayColumnWriter; friend class OffsetColumnWriter; diff --git a/be/src/olap/rowset/segment_v2/index_file_writer.cpp b/be/src/olap/rowset/segment_v2/index_file_writer.cpp index 20ee08d10e5..10468ac3794 100644 --- a/be/src/olap/rowset/segment_v2/index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/index_file_writer.cpp @@ -31,6 +31,7 @@ #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/rowset/segment_v2/inverted_index_fs_directory.h" #include "olap/rowset/segment_v2/inverted_index_reader.h" +#include "olap/rowset/segment_v2/tmp_file_dirs.h" #include "olap/tablet_schema.h" namespace doris::segment_v2 { diff --git a/be/src/olap/rowset/segment_v2/index_writer.cpp b/be/src/olap/rowset/segment_v2/index_writer.cpp index 5912479fc03..b94c504e1db 100644 --- a/be/src/olap/rowset/segment_v2/index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/index_writer.cpp @@ -64,7 +64,6 @@ #include "util/faststring.h" #include "util/slice.h" #include "util/string_util.h" -#include "vector/diskann_vector_index.h" #include "vector/vector_index.h" namespace doris { diff --git a/be/src/olap/rowset/segment_v2/index_writer.h b/be/src/olap/rowset/segment_v2/index_writer.h index d56f1881246..38a4907bc19 100644 --- a/be/src/olap/rowset/segment_v2/index_writer.h +++ b/be/src/olap/rowset/segment_v2/index_writer.h @@ -48,11 +48,17 @@ class IndexColumnWriter { public: static Status create(const Field* field, std::unique_ptr<IndexColumnWriter>* res, IndexFileWriter* index_file_writer, const TabletIndex* inverted_index); - virtual Status init() = 0; + // check if the column is valid for inverted index, some columns + // are generated from variant, but not all of them are supported + static bool check_support_inverted_index(const TabletColumn& column); + static bool check_support_ann_index(const TabletColumn& column); IndexColumnWriter() = default; virtual ~IndexColumnWriter() = default; + virtual Status init() = 0; + virtual int64_t size() const = 0; + virtual Status add_values(const std::string name, const void* values, size_t count) = 0; virtual Status add_array_values(size_t field_size, const CollectionValue* values, size_t count) = 0; @@ -66,48 +72,10 @@ public: virtual Status finish() = 0; - virtual int64_t size() const = 0; - virtual void close_on_error() = 0; - // check if the column is valid for inverted index, some columns - // are generated from variant, but not all of them are supported - static bool check_support_inverted_index(const TabletColumn& column); - static bool check_support_ann_index(const TabletColumn& column); - private: DISALLOW_COPY_AND_ASSIGN(IndexColumnWriter); }; - -class TmpFileDirs { -public: - TmpFileDirs(const std::vector<doris::StorePath>& store_paths) { - for (const auto& store_path : store_paths) { - _tmp_file_dirs.emplace_back(store_path.path + "/" + config::tmp_file_dir); - } - }; - - Status init() { - for (auto& tmp_file_dir : _tmp_file_dirs) { - // delete the tmp dir to avoid the tmp files left by last crash - RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(tmp_file_dir)); - RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_file_dir)); - } - return Status::OK(); - }; - - io::Path get_tmp_file_dir() { - std::cout << "TmpFileDirs size: " << _tmp_file_dirs.size() << std::endl; - size_t cur_index = _next_index.fetch_add(1); - return _tmp_file_dirs[cur_index % _tmp_file_dirs.size()]; - }; - - ~TmpFileDirs() { std::cout << "TmpFileDirs destroyed!" << std::endl; } - -private: - std::vector<io::Path> _tmp_file_dirs; - std::atomic_size_t _next_index {0}; // use for round-robin -}; - } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 726b88f95ac..e4b59f28875 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -2160,62 +2160,23 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu } Status SegmentIterator::next_batch(vectorized::Block* block) { - // Append virtual columns to block - if (block->columns() < _schema->num_column_ids() + _virtual_column_exprs.size()) { - std::vector<size_t> vir_col_pos; - for (auto pair1 : _vir_cid_to_idx_in_block) { - vir_col_pos.push_back(pair1.second); - } - std::sort(vir_col_pos.begin(), vir_col_pos.end()); - for (size_t i = 0; i < vir_col_pos.size(); ++i) { - DCHECK(_opts.vir_col_idx_to_type.find(vir_col_pos[i]) != - _opts.vir_col_idx_to_type.end()); - block->insert({vectorized::ColumnNothing::create(0), - _opts.vir_col_idx_to_type[vir_col_pos[i]], - fmt::format("_VIR_COL_{}", i)}); - } - } else { - // Before get next batch. make sure all virtual columns has type ColumnNothing. - std::vector<size_t> vir_col_pos; - for (const auto& pair : _vir_cid_to_idx_in_block) { - vir_col_pos.push_back(pair.second); - block->replace_by_position(pair.second, vectorized::ColumnNothing::create(0)); - } - } + // Append virtual columns to the end of block before getting each batch. + _init_virtual_columns(block); auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ auto res = _next_batch_internal(block); if (res.is<END_OF_FILE>() && block->rows() == 0) { - // replace column nothing with real column - const auto idx_to_datatype = _opts.vir_col_idx_to_type; + // Since we have a type check at the caller. + // So a replacement of nothing column with real column is needed. + const auto& idx_to_datatype = _opts.vir_col_idx_to_type; for (const auto& pair : _vir_cid_to_idx_in_block) { size_t idx = pair.second; auto type = idx_to_datatype.find(idx)->second; block->replace_by_position(idx, type->create_column()); - LOG_INFO( - "SegmentIterator next block replace column nothing with real column " - "idx {}, type {}", - idx, type->get_name()); - } - - size_t idx = 0; - for (const auto& entry : *block) { - if (vectorized::check_and_get_column<vectorized::ColumnNothing>( - entry.column.get())) { - LOG_ERROR( - "Column in idx {} is nothing, block columns {}, normal_columns " - "{}, ", - idx, block->columns(), _schema->column_ids().size()); - - throw doris::Exception(ErrorCode::INTERNAL_ERROR, - "Column in idx {} is nothing", idx); - } - idx++; } - return res; } @@ -2935,6 +2896,31 @@ bool SegmentIterator::_can_opt_topn_reads() { return all_true; } +void SegmentIterator::_init_virtual_columns(vectorized::Block* block) { + const size_t num_virtual_columns = _virtual_column_exprs.size(); + if (block->columns() < _schema->num_column_ids() + num_virtual_columns) { + std::vector<size_t> vir_col_idx; + for (const auto& pair : _vir_cid_to_idx_in_block) { + vir_col_idx.push_back(pair.second); + } + std::sort(vir_col_idx.begin(), vir_col_idx.end()); + for (size_t i = 0; i < num_virtual_columns; ++i) { + auto iter = _opts.vir_col_idx_to_type.find(vir_col_idx[i]); + DCHECK(iter != _opts.vir_col_idx_to_type.end()); + // Name of virtual currently is not used, so we just use a dummy name. + block->insert({vectorized::ColumnNothing::create(0), iter->second, + fmt::format("VIRTUAL_COLUMN_{}", i)}); + } + } else { + // Before get next batch. make sure all virtual columns has type ColumnNothing. + std::vector<size_t> vir_col_pos; + for (const auto& pair : _vir_cid_to_idx_in_block) { + vir_col_pos.push_back(pair.second); + block->replace_by_position(pair.second, vectorized::ColumnNothing::create(0)); + } + } +} + Status SegmentIterator::_materialization_of_virtual_column(vectorized::Block* block) { size_t prev_block_columns = block->columns(); for (const auto& cid_and_expr : _virtual_column_exprs) { diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index c97eec51e5f..8d9336cbf18 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -365,6 +365,8 @@ private: void _clear_iterators(); + void _init_virtual_columns(vectorized::Block* block); + Status _materialization_of_virtual_column(vectorized::Block* block); class BitmapRangeIterator; diff --git a/be/src/olap/rowset/segment_v2/index_writer.h b/be/src/olap/rowset/segment_v2/tmp_file_dirs.h similarity index 52% copy from be/src/olap/rowset/segment_v2/index_writer.h copy to be/src/olap/rowset/segment_v2/tmp_file_dirs.h index d56f1881246..e77badb6a6b 100644 --- a/be/src/olap/rowset/segment_v2/index_writer.h +++ b/be/src/olap/rowset/segment_v2/tmp_file_dirs.h @@ -1,3 +1,4 @@ + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -17,68 +18,13 @@ #pragma once -#include <butil/macros.h> -#include <stddef.h> -#include <stdint.h> - -#include <atomic> -#include <memory> -#include <string> #include <vector> -#include "common/config.h" -#include "common/status.h" -#include "io/fs/file_system.h" #include "io/fs/local_file_system.h" -#include "olap/olap_common.h" +#include "io/fs/path.h" #include "olap/options.h" -namespace doris { -class CollectionValue; - -class Field; - -class TabletIndex; -class TabletColumn; - -namespace segment_v2 { -class IndexFileWriter; - -class IndexColumnWriter { -public: - static Status create(const Field* field, std::unique_ptr<IndexColumnWriter>* res, - IndexFileWriter* index_file_writer, const TabletIndex* inverted_index); - virtual Status init() = 0; - - IndexColumnWriter() = default; - virtual ~IndexColumnWriter() = default; - - virtual Status add_values(const std::string name, const void* values, size_t count) = 0; - virtual Status add_array_values(size_t field_size, const CollectionValue* values, - size_t count) = 0; - - virtual Status add_array_values(size_t field_size, const void* value_ptr, - const uint8_t* null_map, const uint8_t* offsets_ptr, - size_t count) = 0; - - virtual Status add_nulls(uint32_t count) = 0; - virtual Status add_array_nulls(const uint8_t* null_map, size_t num_rows) = 0; - - virtual Status finish() = 0; - - virtual int64_t size() const = 0; - - virtual void close_on_error() = 0; - - // check if the column is valid for inverted index, some columns - // are generated from variant, but not all of them are supported - static bool check_support_inverted_index(const TabletColumn& column); - static bool check_support_ann_index(const TabletColumn& column); - -private: - DISALLOW_COPY_AND_ASSIGN(IndexColumnWriter); -}; - +namespace doris::segment_v2 { class TmpFileDirs { public: TmpFileDirs(const std::vector<doris::StorePath>& store_paths) { @@ -109,5 +55,4 @@ private: std::atomic_size_t _next_index {0}; // use for round-robin }; -} // namespace segment_v2 -} // namespace doris +} // namespace doris::segment_v2 diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index e255ef2a058..165c6c1afa0 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -250,8 +250,6 @@ bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) { Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector<DataDir*> stores, RuntimeProfile* profile) { - DorisMetrics::instance()->create_tablet_requests_total->increment(1); - int64_t tablet_id = request.tablet_id; LOG(INFO) << "begin to create tablet. tablet_id=" << tablet_id << ", table_id=" << request.table_id << ", partition_id=" << request.partition_id diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 2123c2a2dbc..e35d3687975 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -268,6 +268,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id break; } } + // TODO: Why not add a flag for inverted index? } } } diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 4dfb066ae52..4354d4e8717 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -274,7 +274,7 @@ Status ScanLocalState<Derived>::_normalize_predicate( if (conjunct_expr_root != nullptr) { if (is_leaf(conjunct_expr_root)) { auto impl = conjunct_expr_root->get_impl(); - // If impl is not null, which means this a conjuncts from runtime filter. + // If impl is not null, which means this is a conjunct from runtime filter. vectorized::VExpr* cur_expr = impl ? impl.get() : conjunct_expr_root.get(); if (dynamic_cast<vectorized::VirtualSlotRef*>(cur_expr)) { // If the expr has virtual slot ref, we need to keep it in the tree. @@ -282,8 +282,6 @@ Status ScanLocalState<Derived>::_normalize_predicate( return Status::OK(); } - // select funcA(col) from t where - SlotDescriptor* slot = nullptr; ColumnValueRangeType* range = nullptr; PushDownType pdt = PushDownType::UNACCEPTABLE; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a3c2149d1ad..8b32883ba7f 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -53,6 +53,7 @@ #include "olap/options.h" #include "olap/page_cache.h" #include "olap/rowset/segment_v2/inverted_index_cache.h" +#include "olap/rowset/segment_v2/tmp_file_dirs.h" #include "olap/schema_cache.h" #include "olap/segment_loader.h" #include "olap/storage_engine.h" diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 786aa977ad8..84754800ad5 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -97,11 +97,6 @@ Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size, continue; } auto column_ptr = slot_desc->get_empty_mutable_column(); - if (slot_desc->get_virtual_column_expr() != nullptr) { - // Make sure virtual column is assigend with a ColumnNothing - std::ignore = assert_cast<const ColumnNothing*>(column_ptr.get()); - } - column_ptr->reserve(block_size); insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 7050538c4c2..5f13ebdf06e 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -315,36 +315,9 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, } // We got a new created block or a reused block. status = scanner->get_block_after_projects(state, free_block.get(), &eos); - size_t idx = 0; - for (const auto& entry : *free_block) { - if (vectorized::check_and_get_column<vectorized::ColumnNothing>( - entry.column.get())) { - std::shared_ptr<OlapScanner> ol_sca = - std::dynamic_pointer_cast<OlapScanner>(scanner); - std::vector<std::string> vcid_to_idx; - - for (const auto& pair : ol_sca->_vir_cid_to_idx_in_block) { - vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second)); - } - std::string vir_cid_to_idx_in_block_msg = fmt::format( - "_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")); - LOG_ERROR( - "Column in idx {} is nothing, block columns {}, normal_columns " - "{}, " - "vir_cid_to_idx_in_block_msg {}", - idx, free_block->columns(), ol_sca->_return_columns.size(), - vir_cid_to_idx_in_block_msg); - - throw doris::Exception(ErrorCode::INTERNAL_ERROR, - "Column in idx {} is nothing, block columns {}, " - "normal_columns {}, " - "virtual_columns {}", - idx, free_block->columns(), - ol_sca->_return_columns.size(), - ol_sca->_vir_cid_to_idx_in_block.size()); - } - idx++; - } + + _make_sure_virtual_col_is_materialized(scanner, free_block.get()); + first_read = false; if (!status.ok()) { LOG(WARNING) << "Scan thread read Scanner failed: " << status.to_string(); @@ -447,4 +420,42 @@ int ScannerScheduler::get_remote_scan_thread_queue_size() { return config::doris_remote_scanner_thread_pool_queue_size; } +void ScannerScheduler::_make_sure_virtual_col_is_materialized( + const std::shared_ptr<Scanner>& scanner, vectorized::Block* free_block) { +#ifndef NDEBUG + // Currently, virtual column can only be used on olap table. + std::shared_ptr<OlapScanner> olap_scanner = std::dynamic_pointer_cast<OlapScanner>(scanner); + if (olap_scanner == nullptr) { + return; + } + + size_t idx = 0; + for (const auto& entry : *free_block) { + // Virtual column must be materialized on the end of SegmentIterator's next batch method. + const vectorized::ColumnNothing* column_nothing = + vectorized::check_and_get_column<vectorized::ColumnNothing>(entry.column.get()); + if (column_nothing == nullptr) { + idx++; + continue; + } + + std::vector<std::string> vcid_to_idx; + + for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) { + vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second)); + } + + std::string error_msg = fmt::format( + "Column in idx {} is nothing, block columns {}, normal_columns " + "{}, " + "vir_cid_to_idx_in_block_msg {}", + idx, free_block->columns(), olap_scanner->_return_columns.size(), + fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ","))); + + LOG_ERROR(error_msg); + + throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg); + } +#endif +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 19f97d27642..25318e705c7 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -18,7 +18,6 @@ #pragma once #include <atomic> -#include <cstdint> #include <memory> #include "common/be_mock_util.h" @@ -30,6 +29,7 @@ class ExecEnv; namespace vectorized { class Scanner; +class Block; } // namespace vectorized template <typename T> @@ -82,6 +82,8 @@ private: static void _scanner_scan(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task); + static void _make_sure_virtual_col_is_materialized(const std::shared_ptr<Scanner>& scanner, + vectorized::Block* block); // execution thread pool // _local_scan_thread_pool is for local scan task(typically, olap scanner) // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 2b4d6650824..b89ad9109e1 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -496,7 +496,6 @@ int64_t VCollectIterator::Level0Iterator::version() const { } Status VCollectIterator::Level0Iterator::refresh_current_row() { - LOG_INFO("Level0Iterator refresh current row begin"); RuntimeState* runtime_state = nullptr; if (_reader != nullptr) { runtime_state = _reader->_reader_context.runtime_state; @@ -534,7 +533,6 @@ Status VCollectIterator::Level0Iterator::refresh_current_row() { _ref.row_pos = -1; _current = -1; _rs_reader = nullptr; - LOG_INFO("Level0Iterator refresh current row end"); return Status::Error<END_OF_FILE>(""); } @@ -568,33 +566,6 @@ Status VCollectIterator::Level0Iterator::next(Block* block) { } auto res = _rs_reader->next_block(block); if (!res.ok() && !res.is<END_OF_FILE>()) { - // // replace column nothing with real column - // const auto idx_to_datatype = _reader->_reader_context.vir_col_idx_to_type; - // for (const auto& pair : _reader->_reader_context.vir_cid_to_idx_in_block) { - // size_t idx = pair.second; - // auto type = idx_to_datatype.find(idx)->second; - - // block->replace_by_position(idx, type->create_column()); - // LOG_INFO( - // "Level0Iterator next block replace column nothing with real column " - // "idx {}, type {}", - // idx, type->get_name()); - // } - - // size_t idx = 0; - // for (const auto& entry : *block) { - // if (vectorized::check_and_get_column<vectorized::ColumnNothing>( - // entry.column.get())) { - // LOG_ERROR( - // "Column in idx {} is nothing, block columns {}, normal_columns " - // "{}, ", - // idx, block->columns(), _reader->_return_columns.size()); - - // throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Column in idx {} is nothing", - // idx); - // } - // idx++; - // } return res; } if (res.is<END_OF_FILE>() && block->rows() == 0) { diff --git a/be/src/vector/CMakeLists.txt b/be/src/vector/CMakeLists.txt index e5eb05ee40e..646cc874d9a 100644 --- a/be/src/vector/CMakeLists.txt +++ b/be/src/vector/CMakeLists.txt @@ -19,8 +19,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/vector") set(VECTOR_LIB_SRC - vector_index.h - stream_wrapper.h + vector_index.h ) set(VECTOR_LIB_DEPENDENCIES) diff --git a/be/src/vector/diskann_vector_index.cpp b/be/src/vector/diskann_vector_index.cpp deleted file mode 100644 index dffd42b924a..00000000000 --- a/be/src/vector/diskann_vector_index.cpp +++ /dev/null @@ -1,212 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "diskann_vector_index.h" - -#include <fcntl.h> -#include <omp.h> -#include <sys/mman.h> -#include <sys/stat.h> -#include <unistd.h> - -#include <boost/program_options.hpp> - -#include "ann_parse.h" -#include "extern/diskann/include/combined_file.h" -#include "extern/diskann/include/disk_utils.h" -#include "extern/diskann/include/index.h" -#include "extern/diskann/include/linux_aligned_file_reader.h" -#include "extern/diskann/include/math_utils.h" -#include "extern/diskann/include/partition.h" -#include "extern/diskann/include/pq_flash_index.h" -#include "extern/diskann/include/timer.h" -#include "extern/diskann/include/utils.h" - -#define FINALLY_CLOSE(x) \ - try { \ - if (x != nullptr) { \ - x->close(); \ - delete x; \ - } \ - } catch (...) { \ - } - -doris::Status DiskannVectorIndex::add(int n, const float* vec) { - // 追加数据 - _data_stream.write(reinterpret_cast<const char*>(vec), n * ndim * sizeof(float)); - npt_num += n; - // 保存当前位置 - std::streampos current_pos = _data_stream.tellp(); - // 回到 npt_num 位置更新它 - _data_stream.seekp(npt_num_pos); - _data_stream.write(reinterpret_cast<const char*>(&npt_num), sizeof(npt_num)); - // 恢复写入指针 - _data_stream.seekp(current_pos); - if (_data_stream.fail()) { - return doris::Status::IOError("Failed to write vector data"); - } - return doris::Status::OK(); -} - -int DiskannVectorIndex::calculate_num_pq_chunks() { - double final_index_ram_limit = - diskann::get_memory_budget(builderParameterPtr->get_search_ram_budget()); - int num_pq_chunks = diskann::calculate_num_pq_chunks(final_index_ram_limit, npt_num, - builderParameterPtr->dim); - return num_pq_chunks; -} - -doris::Status DiskannVectorIndex::build() { - try { - diskann::generate_quantized_data<float>( - _data_stream, _pq_pivots_stream, _pq_compressed_stream, - builderParameterPtr->get_metric_type(), builderParameterPtr->get_sample_rate(), - calculate_num_pq_chunks(), false, ""); - _data_stream.seekg(0, _data_stream.beg); - diskann::build_merged_vamana_index<float>( - _data_stream, builderParameterPtr->get_metric_type(), builderParameterPtr->get_l(), - builderParameterPtr->get_r(), 1, builderParameterPtr->get_indexing_ram_budget(), - _vamana_index_stream, "", "", 0, false, builderParameterPtr->get_num_threads(), - false, "", "", "", 0); - _data_stream.seekg(0, _data_stream.beg); - diskann::create_disk_layout<float>(_data_stream, _vamana_index_stream, _disk_layout_stream); - return doris::Status::OK(); - } catch (const std::exception& e) { - return doris::Status::InternalError<true>(std::string(e.what())); - } -} - -doris::Status DiskannVectorIndex::save() { - try { - //构建索引到临时stringstream中 - RETURN_IF_ERROR(build()); - //把stream刷到存储层 - lucene::store::IndexOutput* pq_pivots_output = - _dir->createOutput(DiskannFileDesc::get_pq_pivots_file_name()); - lucene::store::IndexOutput* pq_compressed_output = - _dir->createOutput(DiskannFileDesc::get_pq_compressed_file_name()); - lucene::store::IndexOutput* vamana_index_output = - _dir->createOutput(DiskannFileDesc::get_vamana_index_file_name()); - lucene::store::IndexOutput* disk_layout_output = - _dir->createOutput(DiskannFileDesc::get_disklayout_file_name()); - lucene::store::IndexOutput* tag_output = - _dir->createOutput(DiskannFileDesc::get_tag_file_name()); - RETURN_IF_ERROR(stream_write_to_output(_pq_pivots_stream, pq_pivots_output)); - RETURN_IF_ERROR(stream_write_to_output(_pq_compressed_stream, pq_compressed_output)); - RETURN_IF_ERROR(stream_write_to_output(_vamana_index_stream, vamana_index_output)); - RETURN_IF_ERROR(stream_write_to_output(_disk_layout_stream, disk_layout_output)); - RETURN_IF_ERROR(stream_write_to_output(_tag_stream, tag_output)); - FINALLY_CLOSE(pq_pivots_output); - FINALLY_CLOSE(pq_compressed_output); - FINALLY_CLOSE(vamana_index_output); - FINALLY_CLOSE(disk_layout_output); - FINALLY_CLOSE(tag_output); - } catch (const std::exception& e) { - return doris::Status::InternalError(e.what()); - } - return doris::Status::OK(); -} - -doris::Status DiskannVectorIndex::stream_write_to_output(std::stringstream& stream, - lucene::store::IndexOutput* output) { - try { - stream.seekg(0, std::ios::beg); // 确保从头开始读取 - if (!stream.good()) { - return doris::Status::Corruption("stream seekg failed"); - } - const size_t buffer_size = 4096; // 4KB 缓冲区 - std::vector<char> buffer(buffer_size); - while (stream) { // 只要 stream 仍然有效,就继续读取 - stream.read(buffer.data(), buffer_size); - std::streamsize bytes_read = stream.gcount(); // 获取实际读取的字节数 - if (bytes_read > 0) { - output->writeBytes(reinterpret_cast<const uint8_t*>(buffer.data()), - static_cast<int32_t>(bytes_read)); - } - } - return doris::Status::OK(); - } catch (const std::exception& e) { - return doris::Status::Corruption(std::string("failed stream write to output, message=") + - e.what()); - } -} - -doris::Status DiskannVectorIndex::load(VectorIndex::Metric dist_fn) { - diskann::Metric metric; - if (dist_fn == VectorIndex::Metric::L2) { - metric = diskann::Metric::L2; - } else if (dist_fn == VectorIndex::Metric::INNER_PRODUCT) { - metric = diskann::Metric::INNER_PRODUCT; - } else if (dist_fn == VectorIndex::Metric::COSINE) { - metric = diskann::Metric::COSINE; - } else { - std::cout << "Error. Only l2 and mips distance functions are supported" << std::endl; - return doris::Status::InternalError( - "Error. Only l2 and mips distance functions are supported"); - } - lucene::store::IndexInput* pq_pivots_input = - _dir->openInput(DiskannFileDesc::get_pq_pivots_file_name()); - std::cout << "Actual type: " << typeid(*pq_pivots_input).name() << std::endl; - - lucene::store::IndexInput* pq_compressed_input = - _dir->openInput(DiskannFileDesc::get_pq_compressed_file_name()); - lucene::store::IndexInput* vamana_index_input = - _dir->openInput(DiskannFileDesc::get_vamana_index_file_name()); - lucene::store::IndexInput* disk_layout_input = - _dir->openInput(DiskannFileDesc::get_disklayout_file_name()); - lucene::store::IndexInput* tag_input = _dir->openInput(DiskannFileDesc::get_tag_file_name()); - //Try to minimize the intrusion of Lucene code into the source code of Diskann, so we will split it into a layer here - std::shared_ptr<IndexInputReaderWrapper> pq_pivots_reader( - new IndexInputReaderWrapper(pq_pivots_input)); - std::shared_ptr<IndexInputReaderWrapper> pq_compressed_reader( - new IndexInputReaderWrapper(pq_compressed_input)); - std::shared_ptr<IndexInputReaderWrapper> vamana_index_reader( - new IndexInputReaderWrapper(vamana_index_input)); - std::shared_ptr<IndexInputReaderWrapper> disk_layout_reader( - new IndexInputReaderWrapper(disk_layout_input)); - std::shared_ptr<IndexInputReaderWrapper> tag_reader(new IndexInputReaderWrapper(tag_input)); - - _pFlashIndex = - std::make_shared<diskann::PQFlashIndex<float, uint16_t>>(disk_layout_reader, metric); - _pFlashIndex->load(8, pq_pivots_reader, pq_compressed_reader, vamana_index_reader, - disk_layout_reader, tag_reader); - return doris::Status::OK(); -} - -doris::Status DiskannVectorIndex::search(const float* query_vec, int topk, SearchResult* result, - const SearchParameters* params) { - try { - DiskannSearchParameter* searchParam = (DiskannSearchParameter*)params; - IDFilter* filter = searchParam->get_filter(); - int optimized_beamwidth = searchParam->get_beam_width(); - int search_list = searchParam->get_search_list(); - std::vector<uint64_t> query_result_ids_64(topk); - std::vector<float> query_result_dists(topk); - diskann::QueryStats* stats = static_cast<diskann::QueryStats*>(result->stat); - uint32_t k = _pFlashIndex->cached_beam_search( - query_vec, topk, search_list, query_result_ids_64.data(), query_result_dists.data(), - optimized_beamwidth, filter, stats); - result->rows = k; - for (int i = 0; i < k; i++) { - result->distances.push_back(query_result_dists[i]); - result->ids.push_back(query_result_ids_64[i]); - } - return doris::Status::OK(); - } catch (const std::exception& e) { - return doris::Status::InternalError(e.what()); - } -} diff --git a/be/src/vector/diskann_vector_index.h b/be/src/vector/diskann_vector_index.h deleted file mode 100644 index ff15d786249..00000000000 --- a/be/src/vector/diskann_vector_index.h +++ /dev/null @@ -1,284 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// #pragma once - -// #include <CLucene.h> -// #include <CLucene/store/IndexInput.h> -// #include <CLucene/store/IndexOutput.h> - -// #include "extern/diskann/include/distance.h" -// #include "extern/diskann/include/pq_flash_index.h" -// #include "vector_index.h" -// #include <roaring/roaring.hh> - -// struct DiskannBuilderParameter : public BuilderParameter{ -// diskann::Metric metric_type; -// int L; -// int R; -// int num_threads; -// double sample_rate; -// float indexing_ram_budget; //单位GB -// float search_ram_budget; //单位GB -// int dim; - -// DiskannBuilderParameter& with_mertic_type(VectorIndex::Metric metric){ -// metric_type = convert_to_diskann_metric(metric); -// return *this; -// } - -// diskann::Metric convert_to_diskann_metric(VectorIndex::Metric metric) { -// switch (metric) { -// case VectorIndex::Metric::L2: -// return diskann::Metric::L2; -// case VectorIndex::Metric::COSINE: -// return diskann::Metric::COSINE; -// case VectorIndex::Metric::INNER_PRODUCT: -// return diskann::Metric::INNER_PRODUCT; -// default: -// throw std::invalid_argument("Unknown metric type"); -// } -// } - -// std::string metric_to_string(diskann::Metric metric) { -// switch (metric) { -// case diskann::Metric::L2: -// return "L2"; -// case diskann::Metric::INNER_PRODUCT: -// return "INNER_PRODUCT"; -// case diskann::Metric::COSINE: -// return "COSINE"; -// case diskann::Metric::FAST_L2: -// return "FAST_L2"; -// default: -// return "UNKNOWN"; -// } -// } - -// DiskannBuilderParameter& with_dim(int d){ -// dim = d; -// return *this; -// } - -// DiskannBuilderParameter& with_indexing_ram_budget_mb(float ram){ -// indexing_ram_budget = ram / 1024; -// return *this; -// } - -// DiskannBuilderParameter& with_search_ram_budget_mb(float ram){ -// search_ram_budget = ram / 1024; -// return *this; -// } - -// DiskannBuilderParameter& with_sample_rate(float rate){ -// sample_rate = rate; -// return *this; -// } - -// DiskannBuilderParameter& with_build_num_threads(int threads){ -// num_threads = threads; -// return *this; -// } - -// DiskannBuilderParameter& with_L(int l){ -// L = l; -// return *this; -// } - -// DiskannBuilderParameter& with_R(int r){ -// R = r; -// return *this; -// } - -// std::string to_string(){ -// std::ostringstream oss; -// oss << "metric_type:" << metric_to_string(metric_type) -// << ", L:" << L -// << ", R:" << R -// << ", num_threads:" << num_threads -// << ", sample_rate:" << sample_rate -// << ", indexing_ram_budget:" << indexing_ram_budget <<"G" -// << ", search_ram_budget:" << search_ram_budget <<"G" -// << ", dim:" << dim; -// return oss.str(); -// } -// diskann::Metric get_metric_type(){ -// return metric_type; -// } -// int get_l(){ -// return L; -// } -// int get_r(){ -// return R; -// } -// int get_num_threads(){ -// return num_threads; -// } -// double get_sample_rate(){ -// return sample_rate; -// } -// float get_indexing_ram_budget(){ -// return indexing_ram_budget; -// } -// int get_dim(){ -// return dim; -// } -// float get_search_ram_budget(){ -// return search_ram_budget; -// } -// }; - -// struct IDFilter : public diskann::Filter { -// private: -// std::shared_ptr<roaring::Roaring> _bitmap; -// public: -// IDFilter(std::shared_ptr<roaring::Roaring> bitmap){ -// _bitmap = bitmap; -// } -// bool is_member(uint32_t idx){ -// return _bitmap->contains(idx); -// } -// }; - -// struct DiskannSearchParameter : public SearchParameters{ -// int search_list; -// int beam_width; -// std::shared_ptr<IDFilter> filter; -// DiskannSearchParameter(){ -// filter = nullptr; -// search_list = 100; -// beam_width = 2; -// } -// DiskannSearchParameter& with_search_list(int l){ -// search_list = l; -// return *this; -// } -// DiskannSearchParameter& with_beam_width(int width){ -// beam_width = width; -// return *this; -// } - -// DiskannSearchParameter& set_filter(std::shared_ptr<IDFilter> f){ -// filter = f; -// return *this; -// } - -// IDFilter *get_filter(){ -// return filter.get(); -// } -// int get_search_list(){ -// return search_list; -// } -// int get_beam_width(){ -// return beam_width; -// } -// }; - -// class WriterWrapper { -// private: -// std::shared_ptr<lucene::store::IndexOutput> _out; -// public: -// WriterWrapper(std::shared_ptr<lucene::store::IndexOutput> out); -// void write(uint8_t* b, uint64_t len); -// }; - -// //diskann的索引文件合到到1个文件,重新定义下每个部分代表什么含义 -// class DiskannFileDesc { -// public: -// static constexpr const char* PQ_PIVOTS_FILE_NAME = "pq_pivots_file"; -// static constexpr const char* PQ_COMPRESSED_FILE_NAME = "pq_compressed_file"; -// static constexpr const char* VAMANA_INDEX_FILE_NAME = "vamana_index_file"; -// static constexpr const char* DISK_LAYOUT_FILE_NAME = "disk_layout_file"; -// static constexpr const char* TAG_FILE_NAME = "tag_file"; -// static const char* get_pq_pivots_file_name(){ -// return PQ_PIVOTS_FILE_NAME; -// } -// static const char* get_pq_compressed_file_name(){ -// return PQ_COMPRESSED_FILE_NAME; -// } -// static const char* get_vamana_index_file_name(){ -// return VAMANA_INDEX_FILE_NAME; -// } -// static const char* get_disklayout_file_name(){ -// return DISK_LAYOUT_FILE_NAME; -// } -// static const char* get_tag_file_name(){ -// return TAG_FILE_NAME; -// } -// }; - -// class DiskannVectorIndex : public VectorIndex{ -// private: -// std::shared_ptr<diskann::PQFlashIndex<float,uint16_t> > _pFlashIndex; -// std::shared_ptr<DiskannBuilderParameter> builderParameterPtr; -// //适配doris的存储介质 -// std::shared_ptr<lucene::store::Directory> _dir; - -// //原始向量 -// std::stringstream _data_stream; -// //codebook临时缓存 -// std::stringstream _pq_pivots_stream; -// //量化向量临时缓存 -// std::stringstream _pq_compressed_stream; -// //图索引 -// std::stringstream _vamana_index_stream; -// //最终的磁盘布局 -// std::stringstream _disk_layout_stream; -// std::stringstream _tag_stream; - -// int npt_num = 0 ; // 向量总数 -// int ndim = 0; // 向量维度 -// std::streampos npt_num_pos; // 记录 npt_num 在流中的位置 - -// std::mutex _data_stream_mutex; - -// private: -// int calculate_num_pq_chunks(); - -// public: -// DiskannVectorIndex(std::shared_ptr<lucene::store::Directory> dir){ -// builderParameterPtr = nullptr; -// _dir = dir; -// // 先写入占位的 npt_num 和 ndim -// npt_num_pos = _data_stream.tellp(); // 记录 npt_num 的偏移 -// _data_stream.seekp(static_cast<std::streampos>(npt_num_pos)); -// _data_stream.write(reinterpret_cast<const char*>(&npt_num), sizeof(npt_num)); -// _data_stream.write(reinterpret_cast<const char*>(&ndim), sizeof(ndim)); -// } -// doris::Status add(int n, const float *vec); -// doris::Status build(); -// void set_build_params(std::shared_ptr<BuilderParameter> params){ -// builderParameterPtr = std::static_pointer_cast<DiskannBuilderParameter>(params); - -// //设置dim -// ndim = builderParameterPtr->get_dim(); -// std::streamoff pos = static_cast<std::streamoff>(npt_num_pos + std::streampos(sizeof(npt_num))); -// _data_stream.seekp(pos, std::ios::beg); -// _data_stream.write(reinterpret_cast<const char*>(&ndim), sizeof(ndim)); -// } -// doris::Status search( -// const float * query_vec, -// int k, -// SearchResult *result, -// const SearchParameters* params = nullptr); -// //把std::string的内容刷到_dir中 -// doris::Status save(); -// //负责从dir中解析内容 -// doris::Status load(VectorIndex::Metric dist_fn); -// private: -// doris::Status stream_write_to_output(std::stringstream &stream, lucene::store::IndexOutput *output); -// }; diff --git a/be/src/vector/faiss_vector_index.h b/be/src/vector/faiss_vector_index.h index 129a6b26ccb..e7251cf52c3 100644 --- a/be/src/vector/faiss_vector_index.h +++ b/be/src/vector/faiss_vector_index.h @@ -26,7 +26,6 @@ #include <string> #include "common/status.h" -#include "util/metrics.h" #include "vector_index.h" namespace doris::segment_v2 { @@ -36,9 +35,8 @@ struct FaissBuildParameter { enum class Quantilizer { FLAT, PQ }; enum class MetricType { - L2, // Euclidean distance - IP, // Inner product - COSINE // Cosine similarity + L2, // Euclidean distance + IP, // Inner product }; static IndexType string_to_index_type(const std::string& type) { @@ -66,9 +64,8 @@ struct FaissBuildParameter { return MetricType::L2; } else if (type == "ip") { return MetricType::IP; - } else if (type == "cosine") { - return MetricType::COSINE; } + return MetricType::L2; // default } diff --git a/be/src/vector/stream_wrapper.h b/be/src/vector/stream_wrapper.h deleted file mode 100644 index 5f31e92814a..00000000000 --- a/be/src/vector/stream_wrapper.h +++ /dev/null @@ -1,146 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <CLucene.h> -#include <CLucene/store/IndexInput.h> -#include <CLucene/store/IndexOutput.h> - -#include <mutex> - -class IReaderWrapper { -public: - virtual ~IReaderWrapper() = default; - virtual void seek(uint64_t pos) = 0; - virtual void read(char* s, uint64_t n, uint64_t offset) = 0; - virtual void read(char* s, uint64_t n) = 0; -}; - -class ShareStringStreamReaderWrapper : public IReaderWrapper { -private: - std::stringstream* ss; // 共享的底层stringstream - std::mutex* mtx; // 保护共享资源的互斥锁 - uint64_t _offset; // 当前读取器的偏移量 - std::streamsize last_read_count; // 最后一次读取的字节数 - -public: - // 构造函数,接收共享的stringstream和互斥锁 - ShareStringStreamReaderWrapper(std::stringstream& stream, std::mutex& mutex) - : ss(&stream), mtx(&mutex), _offset(0), last_read_count(0) {} - - // 与std::stringstream兼容的read方法 - void read(char* s, uint64_t n) { - std::lock_guard<std::mutex> lock(*mtx); - - // 移动到当前偏移位置 - ss->seekg(_offset, ss->beg); - - // 读取指定数量的字节 - ss->read(s, n); - - // 获取实际读取的字节数 - last_read_count = ss->gcount(); - - // 更新偏移量 - _offset += static_cast<size_t>(last_read_count); - } - - void read(char* s, uint64_t n, uint64_t offset) { - std::lock_guard<std::mutex> lock(*mtx); - // 移动到当前偏移位置 - ss->seekg(offset, ss->beg); - // 读取指定数量的字节 - ss->read(s, n); - } - - // 与std::stringstream兼容的seekp方法,设置偏移量到指定位置 - void seek(uint64_t pos) { _offset = pos; } - - ~ShareStringStreamReaderWrapper() {} -}; - -//简单把std::stringstream封装下,为了diskann::load_bin有个统一的接口 -class SampleStringStreamReaderWrapper : public IReaderWrapper { -private: - std::stringstream* ss; - size_t _offset; - std::streamsize last_read_count; - -public: - // 构造函数,接收共享的stringstream和互斥锁 - SampleStringStreamReaderWrapper(std::stringstream& stream) - : ss(&stream), _offset(0), last_read_count(0) {} - - // 与std::stringstream兼容的read方法 - void read(char* s, uint64_t n) { ss->read(s, n); } - - void read(char* s, uint64_t n, uint64_t offset) { - // 移动到当前偏移位置 - ss->seekg(offset, ss->beg); - // 读取指定数量的字节 - ss->read(s, n); - } - - void seek(uint64_t pos) { ss->seekg(pos, ss->beg); } - ~SampleStringStreamReaderWrapper() {} -}; - -class IndexInputReaderWrapper : public IReaderWrapper { -private: - lucene::store::IndexInput* _input; - std::mutex mtx; - -public: - IndexInputReaderWrapper(lucene::store::IndexInput* input) { _input = input; } - void seek(uint64_t offset) { _input->seek(offset); } - //Note that the offset here and the offset in the inputindex do not have the same meaning - void read(char* s, uint64_t n, uint64_t offset) { - std::lock_guard<std::mutex> lock(mtx); - _input->seek(offset); - _input->readBytes(reinterpret_cast<uint8_t*>(s), static_cast<int32_t>(n)); - } - - void read(char* s, uint64_t n) { - _input->readBytes(reinterpret_cast<uint8_t*>(s), static_cast<int32_t>(n)); - } - ~IndexInputReaderWrapper() { - if (_input != nullptr) { - _input->close(); - delete _input; - _input = nullptr; - } - } - - //used for debug - std::stringstream readAll() { - std::stringstream buffer; - uint64_t len = _input->length(); - _input->seek(0); // 确保从头开始读取 - - std::vector<char> data(len); // 创建缓冲区 - _input->readBytes(reinterpret_cast<uint8_t*>(data.data()), static_cast<int32_t>(len)); - - buffer.write(data.data(), len); - return buffer; - } -}; - -using IReaderWrapperSPtr = std::shared_ptr<IReaderWrapper>; -using ShareStringStreamReaderWrapperSPtr = std::shared_ptr<ShareStringStreamReaderWrapper>; -using IndexInputReaderWrapperSPtr = std::shared_ptr<IndexInputReaderWrapper>; -using SampleStringStreamReaderWrapperSPtr = std::shared_ptr<SampleStringStreamReaderWrapper>; diff --git a/be/src/vector/vector_index.h b/be/src/vector/vector_index.h index e344f1c06c3..cc2a2c3b565 100644 --- a/be/src/vector/vector_index.h +++ b/be/src/vector/vector_index.h @@ -58,34 +58,6 @@ class VectorIndex { public: enum class Metric { L2, INNER_PRODUCT, UNKNOWN }; - /** Add n vectors of dimension d to the index. - * - * Vectors are implicitly assigned labels ntotal .. ntotal + n - 1 - * This function slices the input vectors in chunks smaller than - * blocksize_add and calls add_core. - * @param n number of vectors - * @param x input matrix, size n * d - */ - virtual doris::Status add(int n, const float* x) = 0; - - virtual doris::Status ann_topn_search(const float* query_vec, int k, - const IndexSearchParameters& params, - IndexSearchResult& result) = 0; - /** - * Search for the nearest neighbors of a query vector within a given radius. - * @param query_vec input vector, size d - * @param radius search radius - * @param result output search result - * @return status of the operation - */ - virtual doris::Status range_search(const float* query_vec, const float& radius, - const IndexSearchParameters& params, - IndexSearchResult& result) = 0; - - virtual doris::Status save(lucene::store::Directory*) = 0; - - virtual doris::Status load(lucene::store::Directory*) = 0; - static std::string metric_to_string(Metric metric) { switch (metric) { case Metric::L2: @@ -105,8 +77,45 @@ public: return Metric::UNKNOWN; } } + virtual ~VectorIndex() = default; + /** Add n vectors of dimension d vectors to the index. + * + * Vectors are implicitly assigned labels ntotal .. ntotal + n - 1 + * This function slices the input vectors in chunks smaller than + * blocksize_add and calls add_core. + * @param n number of vectors + * @param x input matrix, size n * d + */ + virtual doris::Status add(int n, const float* x) = 0; + + /** Return approximate nearest neighbors of a query vector. + * The result is stored in the result object. + * @param query_vec input vector, size d + * @param k number of nearest neighbors to return + * @param params search parameters + * @param result output search result + * @return status of the operation + */ + virtual doris::Status ann_topn_search(const float* query_vec, int k, + const IndexSearchParameters& params, + IndexSearchResult& result) = 0; + /** + * Search for the nearest neighbors of a query vector within a given radius. + * @param query_vec input vector, size d + * @param radius search radius + * @param result output search result + * @return status of the operation + */ + virtual doris::Status range_search(const float* query_vec, const float& radius, + const IndexSearchParameters& params, + IndexSearchResult& result) = 0; + + virtual doris::Status save(lucene::store::Directory*) = 0; + + virtual doris::Status load(lucene::store::Directory*) = 0; + size_t get_dimension() const { return _dimension; } protected: diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 65e5b577236..bdc77f6a07d 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -202,7 +202,7 @@ supportedCreateStatement partitionSpec? #buildIndex | CREATE INDEX (IF NOT EXISTS)? name=identifier ON tableName=multipartIdentifier identifierList - (USING (BITMAP | NGRAM_BF | INVERTED))? + (USING (BITMAP | NGRAM_BF | INVERTED | ANN))? properties=propertyClause? (COMMENT STRING_LITERAL)? #createIndex | CREATE WORKLOAD POLICY (IF NOT EXISTS)? name=identifierOrText (CONDITIONS LEFT_PAREN workloadPolicyConditions RIGHT_PAREN)? diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 220095b36cb..cd7141682ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -20,6 +20,7 @@ package org.apache.doris.alter; import org.apache.doris.analysis.AddColumnClause; import org.apache.doris.analysis.AddColumnsClause; import org.apache.doris.analysis.AlterClause; +import org.apache.doris.analysis.AnnIndexPropertiesChecker; import org.apache.doris.analysis.BuildIndexClause; import org.apache.doris.analysis.CancelAlterTableStmt; import org.apache.doris.analysis.CancelStmt; @@ -2831,6 +2832,10 @@ public class SchemaChangeHandler extends AlterHandler { alterIndex.setIndexId(Env.getCurrentEnv().getNextId()); } + if (indexDef.isAnnIndex()) { + AnnIndexPropertiesChecker.checkProperties(indexDef.getProperties()); + } + for (String col : indexDef.getColumns()) { Column column = olapTable.getColumn(col); if (column != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnnIndexPropertiesChecker.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnnIndexPropertiesChecker.java new file mode 100644 index 00000000000..d0d5648a0a9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnnIndexPropertiesChecker.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.nereids.exceptions.AnalysisException; + +import java.util.Map; + +public class AnnIndexPropertiesChecker { + public static void checkProperties(Map<String, String> properties) { + String type = null; + String metric = null; + String dim = null; + String quantization = null; + for (String key : properties.keySet()) { + switch (key) { + case "index_type": + type = properties.get(key); + if (!type.equals("hnsw")) { + throw new AnalysisException("only support ann index with type hnsw, got: " + type); + } + break; + case "metric_type": + metric = properties.get(key); + if (!metric.equals("l2_distance") && !metric.equals("inner_product")) { + throw new AnalysisException("only support ann index with metric l2_distance or inner_product, got: " + metric); + } + break; + case "dim": + dim = properties.get(key); + try { + int dimension = Integer.parseInt(dim); + if (dimension <= 0) { + throw new AnalysisException("dim of ann index must be a positive integer, got: " + dim); + } + } catch (NumberFormatException e) { + throw new AnalysisException("dim of ann index must be a positive integer, got: " + dim); + } + break; + case "quantization": + quantization = properties.get(key); + break; + default: + throw new AnalysisException("unknown ann index property: " + key); + } + } + + if (type == null) { + throw new AnalysisException("index_type of ann index be specified."); + } + if (metric == null) { + throw new AnalysisException("metric_type of ann index must be specified."); + } + if (dim == null) { + throw new AnalysisException("dim of ann index must be specified"); + } + if (quantization != null) { + if (!quantization.equals("flat") && !quantization.equals("pq")) { + throw new AnalysisException("only support ann index with quantization flat or pq"); + } + } + + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/IndexDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/IndexDef.java index 80f80bcfbae..ecc385d0266 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/IndexDef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/IndexDef.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.nereids.types.DataType; import org.apache.doris.thrift.TInvertedIndexFileStorageFormat; import com.google.common.base.Strings; @@ -217,6 +218,10 @@ public class IndexDef { return (this.indexType == IndexType.INVERTED); } + public boolean isAnnIndex() { + return (this.indexType == IndexType.ANN); + } + // Check if the column type is supported for inverted index public boolean isSupportIdxType(Type colType) { if (colType.isArrayType()) { @@ -235,8 +240,27 @@ public class IndexDef { public void checkColumn(Column column, KeysType keysType, boolean enableUniqueKeyMergeOnWrite, TInvertedIndexFileStorageFormat invertedIndexFileStorageFormat) throws AnalysisException { + if (indexType == IndexType.ANN) { + if (column.isAllowNull()) { + throw new org.apache.doris.nereids.exceptions.AnalysisException("ANN index must be built on a column that is not nullable"); + } + + String indexColName = column.getName(); + caseSensitivityColumns.add(indexColName); + PrimitiveType primitiveType = column.getDataType(); + if (!primitiveType.isArrayType()) { + throw new org.apache.doris.nereids.exceptions.AnalysisException("ANN index column must be array type"); + } + Type columnType = column.getType(); + Type itemType = ((ArrayType) columnType).getItemType(); + if (!itemType.isFloatingPointType()) { + throw new org.apache.doris.nereids.exceptions.AnalysisException("ANN index column item type must be float type"); + } + return; + } + if (indexType == IndexType.BITMAP || indexType == IndexType.INVERTED || indexType == IndexType.BLOOMFILTER - || indexType == IndexType.NGRAM_BF || indexType == IndexType.ANN) { + || indexType == IndexType.NGRAM_BF) { String indexColName = column.getName(); caseSensitivityColumns.add(indexColName); PrimitiveType colType = column.getDataType(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 516c1b1c8d1..0de78ec2596 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -5241,6 +5241,8 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { indexType = "NGRAM_BF"; } else if (ctx.INVERTED() != null) { indexType = "INVERTED"; + } else if (ctx.ANN() != null) { + indexType = "ANN"; } String comment = ctx.STRING_LITERAL() == null ? "" : stripQuotes(ctx.STRING_LITERAL().getText()); // change BITMAP index to INVERTED index diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/IndexDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/IndexDefinition.java index 1aeb0770be2..c7b0ad0aecb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/IndexDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/IndexDefinition.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands.info; +import org.apache.doris.analysis.AnnIndexPropertiesChecker; import org.apache.doris.analysis.IndexDef; import org.apache.doris.analysis.IndexDef.IndexType; import org.apache.doris.analysis.InvertedIndexUtil; @@ -137,6 +138,9 @@ public class IndexDefinition { boolean enableUniqueKeyMergeOnWrite, TInvertedIndexFileStorageFormat invertedIndexFileStorageFormat) throws AnalysisException { if (indexType == IndexType.ANN) { + if (column.isNullable()) { + throw new AnalysisException("ANN index must be built on a column that is not nullable"); + } String indexColName = column.getName(); caseSensitivityCols.add(indexColName); DataType colType = column.getType(); @@ -237,6 +241,10 @@ public class IndexDefinition { return; } + if (indexType == IndexDef.IndexType.ANN) { + AnnIndexPropertiesChecker.checkProperties(this.properties); + } + if (indexType == IndexDef.IndexType.BITMAP || indexType == IndexDef.IndexType.INVERTED) { if (cols == null || cols.size() != 1) { throw new AnalysisException( diff --git a/gensrc/thrift/Constant.thrift b/gensrc/thrift/Constant.thrift new file mode 100644 index 00000000000..45ec4fae4e4 --- /dev/null +++ b/gensrc/thrift/Constant.thrift @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace cpp doris +namespace java org.apache.doris.thrift + +enum AnnMetricType { + l2_distance, + inner_product +} + +enum AnnIndexType { + hnsw +} + +enum AnnQuantizationType { + flat, + pq +} diff --git a/regression-test/suites/ddl_p0/ann_index/create_ann_index_test.groovy b/regression-test/suites/ddl_p0/ann_index/create_ann_index_test.groovy new file mode 100644 index 00000000000..e1b42d87200 --- /dev/null +++ b/regression-test/suites/ddl_p0/ann_index/create_ann_index_test.groovy @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("create_ann_index_test") { + sql "drop table if exists tbl_not_null" + sql """ + CREATE TABLE `tbl_not_null` ( + `id` int NOT NULL COMMENT "", + `embedding` array<float> NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`id`) COMMENT "OLAP" + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql "drop table if exists tbl_nullable" + sql """ + CREATE TABLE tbl_nullable ( + id INT NOT NULL COMMENT "", + embedding ARRAY<FLOAT> NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql "drop index if exists idx_nullable_ann ON tbl_nullable" + // 1. Case for nullable column + test { + sql """ + CREATE INDEX idx_nullable_ann ON tbl_nullable(`embedding`) USING ANN PROPERTIES( + "index_type"="hnsw", + "metric_type"="l2_distance", + "dim"="1" + ); + """ + exception "ANN index must be built on a column that is not nullable" + } + + // 2. Invalid properties cases + // dim is not a positive integer + sql "drop index if exists idx_test_ann2 ON tbl_not_null" + test { + sql """ + CREATE INDEX idx_test_ann2 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + "index_type"="hnsw", + "metric_type"="l2_distance", + "dim"="-1" + ); + """ + exception "dim of ann index must be a positive integer" + } + + // dim is not a number + sql "drop index if exists idx_test_ann3 ON tbl_not_null" + test { + sql """ + CREATE INDEX idx_test_ann3 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + "index_type"="hnsw", + "metric_type"="l2_distance", + "dim"="abc" + ); + """ + exception "dim of ann index must be a positive integer" + } + + // dim is missing + test { + sql """ + CREATE INDEX idx_test_ann4 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + "index_type"="hnsw", + "metric_type"="l2_distance" + ); + """ + exception "dim of ann index must be specified" + } + + // index_type is missing + test { + sql """ + CREATE INDEX idx_test_ann5 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + "metric_type"="l2_distance", + "dim"="1" + ); + """ + exception "index_type of ann index be specified." + } + + // metric_type is missing + test { + sql """ + CREATE INDEX idx_test_ann6 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + "index_type"="hnsw", + "dim"="1" + ); + """ + exception "metric_type of ann index must be specified." + } + + // index_type is incorrect + test { + sql """ + CREATE INDEX idx_test_ann7 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + "index_type"="ivf", + "metric_type"="l2_distance", + "dim"="1" + ); + """ + exception "only support ann index with type hnsw" + } + + // metric_type is incorrect + test { + sql """ + CREATE INDEX idx_test_ann8 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + "index_type"="hnsw", + "metric_type"="cosine", + "dim"="1" + ); + """ + exception "only support ann index with metric l2_distance or inner_product" + } + + // quantization is incorrect + test { + sql """ + CREATE INDEX idx_test_ann9 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + "index_type"="hnsw", + "metric_type"="l2_distance", + "dim"="1", + "quantization"="bad" + ); + """ + exception "only support ann index with quantization flat or pq" + } + + // Unknown property + test { + sql """ + CREATE INDEX idx_test_ann12 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + "index_type"="hnsw", + "metric_type"="l2_distance", + "dim"="1", + "unknown"="xxx" + ); + """ + exception "unknown ann index property: unknown" + } + + // Since drop index can not interupt the execution of create index, so below cases are ignored. + + // // quantization = flat + // test { + // sql """ + // CREATE INDEX idx_test_ann10 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + // "index_type"="hnsw", + // "metric_type"="l2_distance", + // "dim"="1", + // "quantization"="flat" + // ); + // """ + // } + + // sql "drop index idx_test_ann10 ON tbl_not_null" + + // // quantization = pq + // test { + // sql """ + // CREATE INDEX idx_test_ann11 ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + // "index_type"="hnsw", + // "metric_type"="inner_product", + // "dim"="1", + // "quantization"="pq" + // ); + // """ + // } + + // 3. Valid CREATE INDEX syntax (execute last) + test { + sql """ + CREATE INDEX idx_test_ann ON tbl_not_null(`embedding`) USING ANN PROPERTIES( + "index_type"="hnsw", + "metric_type"="l2_distance", + "dim"="1" + ); + """ + } +} \ No newline at end of file diff --git a/regression-test/suites/ddl_p0/ann_index/create_tbl_with_ann_index_test.groovy b/regression-test/suites/ddl_p0/ann_index/create_tbl_with_ann_index_test.groovy new file mode 100644 index 00000000000..53b6e0085b2 --- /dev/null +++ b/regression-test/suites/ddl_p0/ann_index/create_tbl_with_ann_index_test.groovy @@ -0,0 +1,279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("create_tbl_with_ann_index_test") { + sql "drop table if exists ann_tbl1" + test { + sql """ + CREATE TABLE ann_tbl1 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx1 (vec) USING ANN PROPERTIES( + "index_type" = "hnsw", + "metric_type" = "l2_distance", + "dim" = "128" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + } + + sql "drop table if exists ann_tbl2" + test { + sql """ + CREATE TABLE ann_tbl2 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx2 (vec) USING ANN PROPERTIES( + "index_type" = "ivf", + "metric_type" = "l2_distance", + "dim" = "128" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + exception "only support ann index with type hnsw" + } + + // metric_type 错误 + sql "drop table if exists ann_tbl3" + test { + sql """ + CREATE TABLE ann_tbl3 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx3 (vec) USING ANN PROPERTIES( + "index_type" = "hnsw", + "metric_type" = "cosine", + "dim" = "128" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + exception "only support ann index with metric l2_distance or inner_product" + } + + // dim 非正整数 + sql "drop table if exists ann_tbl4" + test { + sql """ + CREATE TABLE ann_tbl4 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx4 (vec) USING ANN PROPERTIES( + "index_type" = "hnsw", + "metric_type" = "l2_distance", + "dim" = "-1" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + exception "dim of ann index must be a positive integer" + } + + // dim 非数字 + sql "drop table if exists ann_tbl5" + test { + sql """ + CREATE TABLE ann_tbl5 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx5 (vec) USING ANN PROPERTIES( + "index_type" = "hnsw", + "metric_type" = "l2_distance", + "dim" = "abc" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + exception "dim of ann index must be a positive integer" + } + + // quantization 错误 + sql "drop table if exists ann_tbl6" + test { + sql """ + CREATE TABLE ann_tbl6 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx6 (vec) USING ANN PROPERTIES( + "index_type" = "hnsw", + "metric_type" = "l2_distance", + "dim" = "128", + "quantization" = "bad" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + exception "only support ann index with quantization flat or pq" + } + + // 缺少 index_type + sql "drop table if exists ann_tbl7" + test { + sql """ + CREATE TABLE ann_tbl7 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx7 (vec) USING ANN PROPERTIES( + "metric_type" = "l2_distance", + "dim" = "128" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + exception "index_type of ann index be specified." + } + + // 缺少 metric_type + sql "drop table if exists ann_tbl8" + test { + sql """ + CREATE TABLE ann_tbl8 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx8 (vec) USING ANN PROPERTIES( + "index_type" = "hnsw", + "dim" = "128" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + exception "metric_type of ann index must be specified." + } + + // 缺少 dim + sql "drop table if exists ann_tbl9" + test { + sql """ + CREATE TABLE ann_tbl9 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx9 (vec) USING ANN PROPERTIES( + "index_type" = "hnsw", + "metric_type" = "l2_distance" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + exception "dim of ann index must be specified" + } + + // 未知属性 + sql "drop table if exists ann_tbl10" + test { + sql """ + CREATE TABLE ann_tbl10 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx10 (vec) USING ANN PROPERTIES( + "index_type" = "hnsw", + "metric_type" = "l2_distance", + "dim" = "128", + "unknown" = "xxx" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + exception "unknown ann index property" + } + + // quantization = flat + sql "drop table if exists ann_tbl11" + test { + sql """ + CREATE TABLE ann_tbl11 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx11 (vec) USING ANN PROPERTIES( + "index_type" = "hnsw", + "metric_type" = "l2_distance", + "dim" = "128", + "quantization" = "flat" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + } + + // quantization = pq + sql "drop table if exists ann_tbl12" + test { + sql """ + CREATE TABLE ann_tbl12 ( + id INT NOT NULL COMMENT "", + vec ARRAY<FLOAT> NOT NULL COMMENT "", + INDEX ann_idx12 (vec) USING ANN PROPERTIES( + "index_type" = "hnsw", + "metric_type" = "inner_product", + "dim" = "128", + "quantization" = "pq" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) COMMENT "OLAP" + DISTRIBUTED BY HASH(id) BUCKETS 2 + PROPERTIES ( + "replication_num" = "1" + ); + """ + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org