This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch topn-lazy-materialize-poc
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/topn-lazy-materialize-poc by
this push:
new 3cefeab849b BE code topn lazy materialze (#48735)
3cefeab849b is described below
commit 3cefeab849b769b7e26277c156e970b06b99973a
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 6 12:58:40 2025 +0800
BE code topn lazy materialze (#48735)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/clucene | 2 +-
be/src/common/consts.h | 1 +
be/src/exec/rowid_fetcher.cpp | 203 +++++++++++++
be/src/exec/rowid_fetcher.h | 26 ++
be/src/olap/CMakeLists.txt | 3 +-
be/src/olap/id_manager.h | 195 ++++++++++++
be/src/olap/rowset/segment_v2/column_reader.cpp | 27 ++
be/src/olap/rowset/segment_v2/column_reader.h | 35 +++
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 11 +
be/src/olap/schema.h | 6 +-
be/src/olap/storage_engine.cpp | 4 +
be/src/olap/tablet_schema.h | 9 +
be/src/olap/utils.h | 11 +
be/src/pipeline/dependency.cpp | 185 ++++++++++++
be/src/pipeline/dependency.h | 47 ++-
be/src/pipeline/exec/cache_sink_operator.cpp | 1 -
be/src/pipeline/exec/cache_sink_operator.h | 6 +-
be/src/pipeline/exec/cache_source_operator.cpp | 2 +-
be/src/pipeline/exec/cache_source_operator.h | 4 +-
.../exec/materialization_sink_operator.cpp | 155 ++++++++++
.../pipeline/exec/materialization_sink_operator.h | 71 +++++
.../exec/materialization_source_operator.cpp | 59 ++++
.../exec/materialization_source_operator.h | 72 +++++
be/src/pipeline/exec/operator.cpp | 8 +-
be/src/pipeline/exec/scan_operator.cpp | 10 +
be/src/pipeline/pipeline_fragment_context.cpp | 21 ++
be/src/runtime/exec_env.h | 3 +
be/src/runtime/exec_env_init.cpp | 3 +
be/src/runtime/query_context.h | 1 -
be/src/runtime/runtime_state.cpp | 5 +
be/src/runtime/runtime_state.h | 8 +
be/src/runtime/workload_group/workload_group.h | 5 +
be/src/service/backend_options.h | 1 +
be/src/service/internal_service.cpp | 38 +++
be/src/service/internal_service.h | 4 +
be/src/util/ref_count_closure.h | 7 +-
be/src/vec/columns/column.h | 1 -
be/src/vec/common/string_ref.cpp | 5 +-
be/src/vec/exec/scan/new_olap_scanner.cpp | 8 +
be/src/vec/exec/scan/vfile_scanner.cpp | 3 +-
be/test/exec/hash_map/hash_table_method_test.cpp | 5 +-
be/test/olap/id_manager_test.cpp | 107 +++++++
.../operator/materialization_shared_state_test.cpp | 336 +++++++++++++++++++++
gensrc/proto/internal_service.proto | 35 +++
tools/clickbench-tools/conf/doris-cluster.conf | 6 +-
45 files changed, 1724 insertions(+), 31 deletions(-)
diff --git a/be/src/clucene b/be/src/clucene
index 3236e18d93b..2204eaec46a 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit 3236e18d93bf96481493d88c34b6c2515f3b0b75
+Subproject commit 2204eaec46a68e5e9a1876b7021f24839ecb2cf0
diff --git a/be/src/common/consts.h b/be/src/common/consts.h
index 2ec9ae12679..548d5a771a2 100644
--- a/be/src/common/consts.h
+++ b/be/src/common/consts.h
@@ -27,6 +27,7 @@ const std::string CSV_WITH_NAMES_AND_TYPES =
"csv_with_names_and_types";
const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__";
const std::string BLOCK_TEMP_COLUMN_SCANNER_FILTERED =
"__TEMP__scanner_filtered";
const std::string ROWID_COL = "__DORIS_ROWID_COL__";
+const std::string GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__";
const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__";
const std::string PARTIAL_UPDATE_AUTO_INC_COL =
"__PARTIAL_UPDATE_AUTO_INC_COLUMN__";
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index fa9f9571409..ac4023601e7 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -469,4 +469,207 @@ Status RowIdStorageReader::read_by_rowids(const
PMultiGetRequest& request,
return Status::OK();
}
+Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request,
+ PMultiGetResponseV2* response) {
+ if (request.request_block_descs_size()) {
+ OlapReaderStatistics stats;
+ std::vector<vectorized::Block>
result_blocks(request.request_block_descs_size());
+ int64_t acquire_tablet_ms = 0;
+ int64_t acquire_rowsets_ms = 0;
+ int64_t acquire_segments_ms = 0;
+ int64_t lookup_row_data_ms = 0;
+ std::string row_store_buffer;
+
+ // Add counters for different file mapping types
+ std::unordered_map<FileMappingType, int64_t> file_type_counts;
+
+ auto id_file_map =
+
ExecEnv::GetInstance()->get_id_manager()->get_id_file_map(request.query_id());
+ if (!id_file_map) {
+ return Status::InternalError("Backend:{} id_file_map is null,
query_id: {}",
+ BackendOptions::get_localhost(),
+ print_id(request.query_id()));
+ }
+
+ for (int i = 0; i < request.request_block_descs_size(); ++i) {
+ const auto& request_block_desc = request.request_block_descs(i);
+
+ auto& result_block = result_blocks[i];
+ std::vector<SlotDescriptor> slots;
+ slots.reserve(request_block_desc.slots_size());
+ for (const auto& pslot : request_block_desc.slots()) {
+ slots.push_back(SlotDescriptor(pslot));
+ }
+ if (result_block.is_empty_column()) {
+ result_block = vectorized::Block(slots,
request_block_desc.row_id_size());
+ }
+
+ TabletSchema full_read_schema;
+ for (const ColumnPB& column_pb :
request_block_desc.column_descs()) {
+ full_read_schema.append_column(TabletColumn(column_pb));
+ }
+ std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>
iterator_map;
+
+ RowStoreReadStruct row_store_read_struct(row_store_buffer);
+ if (request_block_desc.fetch_row_store()) {
+ for (int j = 0; j < request_block_desc.slots_size(); ++j) {
+ row_store_read_struct.serdes.emplace_back(
+ slots[i].get_data_type_ptr()->get_serde());
+
row_store_read_struct.col_uid_to_idx[slots[i].col_unique_id()] = i;
+
row_store_read_struct.default_values.emplace_back(slots[i].col_default_value());
+ }
+ }
+
+ for (size_t j = 0; j < request_block_desc.row_id_size(); ++j) {
+ auto file_id = request_block_desc.file_id(j);
+ auto file_mapping = id_file_map->get_file_mapping(file_id);
+ if (!file_mapping) {
+ return Status::InternalError(
+ "Backend:{} file_mapping not found, query_id: {},
file_id: {}",
+ BackendOptions::get_localhost(),
print_id(request.query_id()), file_id);
+ }
+
+ // Count file mapping types
+ file_type_counts[file_mapping->type]++;
+
+ if (file_mapping->type == FileMappingType::DORIS_FORMAT) {
+ RETURN_IF_ERROR(read_doris_format_row(
+ id_file_map, file_mapping,
request_block_desc.row_id(j), slots,
+ full_read_schema, row_store_read_struct, stats,
&acquire_tablet_ms,
+ &acquire_rowsets_ms, &acquire_segments_ms,
&lookup_row_data_ms,
+ iterator_map, result_block));
+ }
+ }
+
+ [[maybe_unused]] size_t compressed_size = 0;
+ [[maybe_unused]] size_t uncompressed_size = 0;
+ int be_exec_version = request.has_be_exec_version() ?
request.be_exec_version() : 0;
+ RETURN_IF_ERROR(result_block.serialize(
+ be_exec_version, response->add_blocks()->mutable_block(),
&uncompressed_size,
+ &compressed_size, segment_v2::CompressionTypePB::LZ4));
+ }
+
+ // Build file type statistics string
+ std::string file_type_stats;
+ for (const auto& [type, count] : file_type_counts) {
+ if (!file_type_stats.empty()) {
+ file_type_stats += ", ";
+ }
+ file_type_stats += fmt::format("{}:{}", type, count);
+ }
+
+ LOG(INFO) << "Query stats: "
+ << fmt::format(
+ "hit_cached_pages:{}, total_pages_read:{},
compressed_bytes_read:{}, "
+ "io_latency:{}ns, uncompressed_bytes_read:{},
bytes_read:{}, "
+ "acquire_tablet_ms:{}, acquire_rowsets_ms:{},
acquire_segments_ms:{}, "
+ "lookup_row_data_ms:{}, file_types:[{}]",
+ stats.cached_pages_num, stats.total_pages_num,
+ stats.compressed_bytes_read, stats.io_ns,
+ stats.uncompressed_bytes_read, stats.bytes_read,
acquire_tablet_ms,
+ acquire_rowsets_ms, acquire_segments_ms,
lookup_row_data_ms,
+ file_type_stats);
+ }
+
+ if (request.has_gc_id_map() && request.gc_id_map()) {
+
ExecEnv::GetInstance()->get_id_manager()->remove_id_file_map(request.query_id());
+ }
+
+ return Status::OK();
+}
+
+Status RowIdStorageReader::read_doris_format_row(
+ const std::shared_ptr<IdFileMap>& id_file_map,
+ const std::shared_ptr<FileMapping>& file_mapping, int64_t row_id,
+ std::vector<SlotDescriptor>& slots, const TabletSchema&
full_read_schema,
+ RowStoreReadStruct& row_store_read_struct, OlapReaderStatistics& stats,
+ int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t*
acquire_segments_ms,
+ int64_t* lookup_row_data_ms,
+ std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>&
iterator_map,
+ vectorized::Block& result_block) {
+ auto [tablet_id, rowset_id, segment_id] =
file_mapping->get_doris_format_info();
+ BaseTabletSPtr tablet = scope_timer_run(
+ [&]() {
+ auto res = ExecEnv::get_tablet(tablet_id);
+ return !res.has_value() ? nullptr
+ :
std::dynamic_pointer_cast<BaseTablet>(res.value());
+ },
+ acquire_tablet_ms);
+ if (!tablet) {
+ return Status::InternalError(
+ "Backend:{} tablet not found, tablet_id: {}, rowset_id: {},
segment_id: {}, "
+ "row_id: {}",
+ BackendOptions::get_localhost(), tablet_id,
rowset_id.to_string(), segment_id,
+ row_id);
+ }
+
+ BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(
+ scope_timer_run([&]() { return
id_file_map->get_temp_rowset(tablet_id, rowset_id); },
+ acquire_rowsets_ms));
+ if (!rowset) {
+ return Status::InternalError(
+ "Backend:{} rowset_id not found, tablet_id: {}, rowset_id: {},
segment_id: {}, "
+ "row_id: {}",
+ BackendOptions::get_localhost(), tablet_id,
rowset_id.to_string(), segment_id,
+ row_id);
+ }
+
+ SegmentCacheHandle segment_cache;
+ RETURN_IF_ERROR(scope_timer_run(
+ [&]() {
+ return SegmentLoader::instance()->load_segments(rowset,
&segment_cache, true);
+ },
+ acquire_segments_ms));
+
+ auto it =
+ std::find_if(segment_cache.get_segments().cbegin(),
segment_cache.get_segments().cend(),
+ [segment_id](const segment_v2::SegmentSharedPtr& seg)
{
+ return seg->id() == segment_id;
+ });
+ if (it == segment_cache.get_segments().end()) {
+ return Status::InternalError(
+ "Backend:{} segment not found, tablet_id: {}, rowset_id: {},
segment_id: {}, "
+ "row_id: {}",
+ BackendOptions::get_localhost(), tablet_id,
rowset_id.to_string(), segment_id,
+ row_id);
+ }
+ segment_v2::SegmentSharedPtr segment = *it;
+
+ // if row_store_read_struct not empty, means the line we should read from
row_store
+ if (!row_store_read_struct.default_values.empty()) {
+ CHECK(tablet->tablet_schema()->has_row_store_for_all_columns());
+ RowLocation loc(rowset_id, segment->id(), row_id);
+ row_store_read_struct.row_store_buffer.clear();
+ RETURN_IF_ERROR(scope_timer_run(
+ [&]() {
+ return tablet->lookup_row_data({}, loc, rowset, stats,
+
row_store_read_struct.row_store_buffer);
+ },
+ lookup_row_data_ms));
+
+ vectorized::JsonbSerializeUtil::jsonb_to_block(
+ row_store_read_struct.serdes,
row_store_read_struct.row_store_buffer.data(),
+ row_store_read_struct.row_store_buffer.size(),
row_store_read_struct.col_uid_to_idx,
+ result_block, row_store_read_struct.default_values, {});
+ } else {
+ for (int x = 0; x < slots.size(); ++x) {
+ vectorized::MutableColumnPtr column =
+ result_block.get_by_position(x).column->assume_mutable();
+ IteratorKey iterator_key {.tablet_id = tablet_id,
+ .rowset_id = rowset_id,
+ .segment_id = segment_id,
+ .slot_id = slots[x].id()};
+ IteratorItem& iterator_item = iterator_map[iterator_key];
+ if (iterator_item.segment == nullptr) {
+ iterator_map[iterator_key].segment = segment;
+ }
+ segment = iterator_item.segment;
+ RETURN_IF_ERROR(segment->seek_and_read_by_rowid(full_read_schema,
&slots[x], row_id,
+ column, stats,
iterator_item.iterator));
+ }
+ }
+
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h
index 1fc8b02a679..a43d76988d6 100644
--- a/be/src/exec/rowid_fetcher.h
+++ b/be/src/exec/rowid_fetcher.h
@@ -27,6 +27,7 @@
#include "common/status.h"
#include "exec/tablet_info.h" // DorisNodesInfo
+#include "olap/id_manager.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
@@ -36,6 +37,11 @@ class DorisNodesInfo;
class RuntimeState;
class TupleDescriptor;
+struct FileMapping;
+struct IteratorKey;
+struct IteratorItem;
+struct HashOfIteratorKey;
+
namespace vectorized {
template <typename T>
class ColumnStr;
@@ -70,9 +76,29 @@ private:
FetchOption _fetch_option;
};
+struct RowStoreReadStruct {
+ RowStoreReadStruct(std::string& buffer) : row_store_buffer(buffer) {};
+ std::string& row_store_buffer;
+ vectorized::DataTypeSerDeSPtrs serdes;
+ std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
+ std::vector<std::string> default_values;
+};
+
class RowIdStorageReader {
public:
static Status read_by_rowids(const PMultiGetRequest& request,
PMultiGetResponse* response);
+ static Status read_by_rowids(const PMultiGetRequestV2& request,
PMultiGetResponseV2* response);
+
+private:
+ static Status read_doris_format_row(
+ const std::shared_ptr<IdFileMap>& id_file_map,
+ const std::shared_ptr<FileMapping>& file_mapping, int64_t row_id,
+ std::vector<SlotDescriptor>& slots, const TabletSchema&
full_read_schema,
+ RowStoreReadStruct& row_store_read_struct, OlapReaderStatistics&
stats,
+ int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t*
acquire_segments_ms,
+ int64_t* lookup_row_data_ms,
+ std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>&
iterator_map,
+ vectorized::Block& result_block);
};
} // namespace doris
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index bf19ef26764..6aa10435524 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -22,7 +22,8 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/olap")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap")
file(GLOB_RECURSE SRC_FILES CONFIGURE_DEPENDS *.cpp)
-add_library(Olap STATIC ${SRC_FILES})
+add_library(Olap STATIC ${SRC_FILES}
+ id_manager.h)
if (NOT USE_MEM_TRACKER)
target_compile_options(Olap PRIVATE -Wno-unused-lambda-capture)
diff --git a/be/src/olap/id_manager.h b/be/src/olap/id_manager.h
new file mode 100644
index 00000000000..cb056b792cb
--- /dev/null
+++ b/be/src/olap/id_manager.h
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <butil/macros.h>
+#include <gen_cpp/BackendService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <set>
+#include <shared_mutex>
+#include <string>
+#include <string_view>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/olap_common.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+
+namespace doris {
+
+enum class FileMappingType {
+ DORIS_FORMAT, // for doris format file {tablet_id}{rowset_id}{segment_id}
+ ORC,
+ PARQUET
+};
+
+struct FileMapping {
+ FileMappingType type;
+ std::string value;
+
+ FileMapping(FileMappingType t, std::string v) : type(t),
value(std::move(v)) {};
+
+ FileMapping(int64_t tablet_id, RowsetId rowset_id, uint32_t segment_id)
+ : type(FileMappingType::DORIS_FORMAT) {
+ value.resize(sizeof(tablet_id) + sizeof(rowset_id) +
sizeof(segment_id));
+ auto* ptr = value.data();
+
+ memcpy(ptr, &tablet_id, sizeof(tablet_id));
+ ptr += sizeof(tablet_id);
+ memcpy(ptr, &rowset_id, sizeof(rowset_id));
+ ptr += sizeof(rowset_id);
+ memcpy(ptr, &segment_id, sizeof(segment_id));
+ }
+
+ std::tuple<int64_t, RowsetId, uint32_t> get_doris_format_info() const {
+ DCHECK(type == FileMappingType::DORIS_FORMAT);
+ DCHECK(value.size() == sizeof(int64_t) + sizeof(RowsetId) +
sizeof(uint32_t));
+
+ auto* ptr = value.data();
+ int64_t tablet_id;
+ memcpy(&tablet_id, ptr, sizeof(tablet_id));
+ ptr += sizeof(tablet_id);
+ RowsetId rowset_id;
+ memcpy(&rowset_id, ptr, sizeof(rowset_id));
+ ptr += sizeof(rowset_id);
+ uint32_t segment_id;
+ memcpy(&segment_id, ptr, sizeof(segment_id));
+
+ return std::make_tuple(tablet_id, rowset_id, segment_id);
+ }
+};
+
+class IdFileMap {
+public:
+ IdFileMap(uint64_t expired_timestamp) :
delayed_expired_timestamp(expired_timestamp) {}
+
+ std::shared_ptr<FileMapping> get_file_mapping(uint32_t id) {
+ std::shared_lock lock(_mtx);
+ auto it = _id_map.find(id);
+ if (it == _id_map.end()) {
+ return nullptr;
+ }
+ return it->second;
+ }
+
+ uint32 get_file_mapping_id(const std::shared_ptr<FileMapping>& mapping) {
+ DCHECK(mapping.get() != nullptr);
+ std::unique_lock lock(_mtx);
+ auto it = _mapping_to_id.find(mapping->value);
+ if (it != _mapping_to_id.end()) {
+ return it->second;
+ }
+ _id_map[_init_id++] = mapping;
+ _mapping_to_id[mapping->value] = _init_id - 1;
+
+ return _init_id - 1;
+ }
+
+ void add_temp_rowset(const RowsetSharedPtr& rowset) {
+ std::unique_lock lock(_mtx);
+ _temp_rowset_maps[{rowset->rowset_meta()->tablet_id(),
rowset->rowset_id()}] = rowset;
+ }
+
+ RowsetSharedPtr get_temp_rowset(const int64_t tablet_id, const RowsetId&
rowset_id) {
+ std::shared_lock lock(_mtx);
+ auto it = _temp_rowset_maps.find({tablet_id, rowset_id});
+ if (it == _temp_rowset_maps.end()) {
+ return nullptr;
+ }
+ return it->second;
+ }
+
+ int64_t get_delayed_expired_timestamp() { return
delayed_expired_timestamp; }
+
+private:
+ std::shared_mutex _mtx;
+ uint32_t _init_id = 0;
+ std::unordered_map<std::string_view, uint32_t> _mapping_to_id;
+ std::unordered_map<uint32_t, std::shared_ptr<FileMapping>> _id_map;
+
+ // use in Doris Format to keep temp rowsets, preventing them from being
deleted by compaction
+ std::unordered_map<std::pair<int64_t, RowsetId>, RowsetSharedPtr>
_temp_rowset_maps;
+ uint64_t delayed_expired_timestamp = 0;
+};
+
+class IdManager {
+public:
+ static constexpr uint8_t ID_VERSION = 0;
+
+ IdManager() = default;
+
+ ~IdManager() {
+ std::unique_lock lock(_query_to_id_file_map_mtx);
+ _query_to_id_file_map.clear();
+ }
+
+ std::shared_ptr<IdFileMap> add_id_file_map(const UniqueId& query_id, int
timeout) {
+ std::unique_lock lock(_query_to_id_file_map_mtx);
+ auto it = _query_to_id_file_map.find(query_id);
+ if (it == _query_to_id_file_map.end()) {
+ auto id_file_map = std::make_shared<IdFileMap>(UnixSeconds() +
timeout);
+ _query_to_id_file_map[query_id] = id_file_map;
+ return id_file_map;
+ }
+ return it->second;
+ }
+
+ void gc_expired_id_file_map(int64_t now) {
+ std::unique_lock lock(_query_to_id_file_map_mtx);
+ for (auto it = _query_to_id_file_map.begin(); it !=
_query_to_id_file_map.end();) {
+ if (it->second->get_delayed_expired_timestamp() <= now) {
+ it = _query_to_id_file_map.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+
+ void remove_id_file_map(const UniqueId& query_id) {
+ std::unique_lock lock(_query_to_id_file_map_mtx);
+ _query_to_id_file_map.erase(query_id);
+ }
+
+ std::shared_ptr<IdFileMap> get_id_file_map(const UniqueId& query_id) {
+ std::shared_lock lock(_query_to_id_file_map_mtx);
+ auto it = _query_to_id_file_map.find(query_id);
+ if (it == _query_to_id_file_map.end()) {
+ return nullptr;
+ }
+ return it->second;
+ }
+
+private:
+ DISALLOW_COPY_AND_ASSIGN(IdManager);
+
+ phmap::flat_hash_map<UniqueId, std::shared_ptr<IdFileMap>>
_query_to_id_file_map;
+ std::shared_mutex _query_to_id_file_map_mtx;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 9948e7fd8cf..7f88970eb4a 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -1753,4 +1753,31 @@ Status DefaultNestedColumnIterator::read_by_rowids(const
rowid_t* rowids, const
return Status::OK();
}
+Status RowIdColumnIteratorV2::next_batch(size_t* n,
vectorized::MutableColumnPtr& dst,
+ bool* has_null) {
+ auto* string_column = assert_cast<vectorized::ColumnString*>(dst.get());
+
+ for (size_t i = 0; i < *n; ++i) {
+ uint32_t row_id = _current_rowid + i;
+ GlobalRowLoacationV2 location(_version, _backend_id, _file_id, row_id);
+ string_column->insert_data(reinterpret_cast<const char*>(&location),
+ sizeof(GlobalRowLoacationV2));
+ }
+ _current_rowid += *n;
+ return Status::OK();
+}
+
+Status RowIdColumnIteratorV2::read_by_rowids(const rowid_t* rowids, const
size_t count,
+ vectorized::MutableColumnPtr&
dst) {
+ auto* string_column = assert_cast<vectorized::ColumnString*>(dst.get());
+
+ for (size_t i = 0; i < count; ++i) {
+ uint32_t row_id = rowids[i];
+ GlobalRowLoacationV2 location(_version, _backend_id, _file_id, row_id);
+ string_column->insert_data(reinterpret_cast<const char*>(&location),
+ sizeof(GlobalRowLoacationV2));
+ }
+ return Status::OK();
+}
+
} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h
b/be/src/olap/rowset/segment_v2/column_reader.h
index 2afc269a86c..91866ce97ad 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -634,6 +634,41 @@ private:
int32_t _segment_id = 0;
};
+// Add new RowIdColumnIteratorV2
+class RowIdColumnIteratorV2 : public ColumnIterator {
+public:
+ RowIdColumnIteratorV2(uint8_t version, int64_t backend_id, uint32_t
file_id)
+ : _version(version), _backend_id(backend_id), _file_id(file_id) {}
+
+ Status seek_to_first() override {
+ _current_rowid = 0;
+ return Status::OK();
+ }
+
+ Status seek_to_ordinal(ordinal_t ord_idx) override {
+ _current_rowid = ord_idx;
+ return Status::OK();
+ }
+
+ Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
+ bool has_null;
+ return next_batch(n, dst, &has_null);
+ }
+
+ Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool*
has_null) override;
+
+ Status read_by_rowids(const rowid_t* rowids, const size_t count,
+ vectorized::MutableColumnPtr& dst) override;
+
+ ordinal_t get_current_ordinal() const override { return _current_rowid; }
+
+private:
+ uint32_t _current_rowid = 0;
+ uint8_t _version;
+ int64_t _backend_id;
+ uint32_t _file_id;
+};
+
class VariantRootColumnIterator : public ColumnIterator {
public:
VariantRootColumnIterator() = delete;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 5d2a3d8b60e..391e06d2e15 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -43,6 +43,7 @@
#include "olap/bloom_filter_predicate.h"
#include "olap/column_predicate.h"
#include "olap/field.h"
+#include "olap/id_manager.h"
#include "olap/iterators.h"
#include "olap/like_column_predicate.h"
#include "olap/match_predicate.h"
@@ -1024,6 +1025,16 @@ Status SegmentIterator::_init_return_column_iterators() {
new RowIdColumnIterator(_opts.tablet_id, _opts.rowset_id,
_segment->id()));
continue;
}
+
+ if
(_schema->column(cid)->name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+ auto& id_file_map = _opts.runtime_state->get_id_file_map();
+ uint32_t file_id =
id_file_map->get_file_mapping_id(std::make_shared<FileMapping>(
+ _opts.tablet_id, _opts.rowset_id, _segment->id()));
+ _column_iterators[cid].reset(new RowIdColumnIteratorV2(
+ IdManager::ID_VERSION, BackendOptions::get_backend_id(),
file_id));
+ continue;
+ }
+
std::set<ColumnId> del_cond_id_set;
_opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set);
std::vector<bool> tmp_is_pred_column;
diff --git a/be/src/olap/schema.h b/be/src/olap/schema.h
index 6414db4153a..b7198511120 100644
--- a/be/src/olap/schema.h
+++ b/be/src/olap/schema.h
@@ -68,7 +68,8 @@ public:
if (column.is_key()) {
++num_key_columns;
}
- if (column.name() == BeConsts::ROWID_COL) {
+ if (column.name() == BeConsts::ROWID_COL ||
+ column.name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
_rowid_col_idx = cid;
}
if (column.name() == VERSION_COL) {
@@ -94,7 +95,8 @@ public:
if (columns[i]->name() == DELETE_SIGN) {
_delete_sign_idx = i;
}
- if (columns[i]->name() == BeConsts::ROWID_COL) {
+ if (columns[i]->name() == BeConsts::ROWID_COL ||
+ columns[i]->name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
_rowid_col_idx = i;
}
if (columns[i]->name() == VERSION_COL) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 3ba65813492..21ad2fa051d 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -53,6 +53,7 @@
#include "io/fs/local_file_system.h"
#include "olap/binlog.h"
#include "olap/data_dir.h"
+#include "olap/id_manager.h"
#include "olap/memtable_flush_executor.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@@ -1491,6 +1492,9 @@ void BaseStorageEngine::_evict_querying_rowset() {
}
}
}
+
+ uint64_t now = UnixSeconds();
+ ExecEnv::GetInstance()->get_id_manager()->gc_expired_id_file_map(now);
}
bool StorageEngine::add_broken_path(std::string path) {
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 957b9adb2b9..14202758f5b 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -398,6 +398,15 @@ public:
long row_store_page_size() const { return _row_store_page_size; }
void set_storage_page_size(long storage_page_size) { _storage_page_size =
storage_page_size; }
long storage_page_size() const { return _storage_page_size; }
+ bool has_global_row_id() const {
+ for (auto [col_name, _] : _field_name_to_index) {
+ if
(col_name.start_with(StringRef(BeConsts::GLOBAL_ROWID_COL.data(),
+
BeConsts::GLOBAL_ROWID_COL.size()))) {
+ return true;
+ }
+ }
+ return false;
+ }
const std::vector<const TabletIndex*> inverted_indexes() const {
std::vector<const TabletIndex*> inverted_indexes;
diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h
index c163aad1148..242e39c2c54 100644
--- a/be/src/olap/utils.h
+++ b/be/src/olap/utils.h
@@ -263,4 +263,15 @@ struct GlobalRowLoacation {
}
};
+struct GlobalRowLoacationV2 {
+ GlobalRowLoacationV2(uint8_t ver, uint64_t bid, uint32_t fid, uint32_t rid)
+ : version(ver), backend_id(bid), file_id(fid), row_id(rid) {}
+ uint8_t version;
+ int64_t backend_id;
+ uint32_t file_id;
+ uint32_t row_id;
+
+ auto operator<=>(const GlobalRowLoacationV2&) const = default;
+};
+
} // namespace doris
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 46e1a366b7b..603d77e93c9 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -27,12 +27,14 @@
#include "pipeline/pipeline_task.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
+#include "util/brpc_client_cache.h"
#include "vec/exprs/vectorized_agg_fn.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
+#include "vec/utils/util.hpp"
Dependency* BasicSharedState::create_source_dependency(int operator_id, int
node_id,
const std::string&
name) {
source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id,
name + "_DEPENDENCY"));
@@ -456,4 +458,187 @@ void AggSharedState::refresh_top_limit(size_t row_id,
limit_columns_min = limit_heap.top()._row_id;
}
+Status MaterializationSharedState::merge_multi_response(vectorized::Block*
block) {
+ // init the response_blocks
+ if (response_blocks.empty()) {
+ response_blocks =
std::vector<vectorized::MutableBlock>(block_order_results.size());
+ }
+
+ std::map<int64_t, std::pair<vectorized::Block, int>> _block_maps;
+ for (int i = 0; i < block_order_results.size(); ++i) {
+ for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
+ vectorized::Block partial_block;
+ RETURN_IF_ERROR(
+
partial_block.deserialize(rpc_struct.callback->response_->blocks(i).block()));
+
+ if (!partial_block.is_empty_column()) {
+ if (!response_blocks[i].columns()) {
+ response_blocks[i] =
vectorized::MutableBlock(partial_block.clone_empty());
+ }
+ _block_maps[backend_id] =
std::make_pair(std::move(partial_block), 0);
+ }
+ }
+
+ for (int j = 0; j < block_order_results[i].size(); ++j) {
+ auto backend_id = block_order_results[i][j];
+ if (backend_id) {
+ auto& source_block_rows = _block_maps[backend_id];
+ DCHECK(source_block_rows.second <
source_block_rows.first.rows());
+ for (int k = 0; k < response_blocks[i].columns(); ++k) {
+ response_blocks[i].get_column_by_position(k)->insert_from(
+ *source_block_rows.first.get_by_position(k).column,
+ source_block_rows.second);
+ }
+ source_block_rows.second++;
+ } else {
+ for (int k = 0; k < response_blocks[i].columns(); ++k) {
+
response_blocks[i].get_column_by_position(k)->insert_default();
+ }
+ }
+ }
+ }
+
+ // clear request/response
+ for (auto& [_, rpc_struct] : rpc_struct_map) {
+ for (int i = 0; i < rpc_struct.request.request_block_descs_size();
++i) {
+ rpc_struct.request.mutable_request_block_descs(i)->clear_row_id();
+ rpc_struct.request.mutable_request_block_descs(i)->clear_file_id();
+ }
+ }
+
+ for (int i = 0, j = 0, rowid_to_block_loc = rowid_locs[j]; i <
origin_block.columns(); i++) {
+ if (i != rowid_to_block_loc) {
+ block->insert(origin_block.get_by_position(i));
+ } else {
+ auto response_block = response_blocks[j].to_block();
+ for (auto& data : response_block) {
+ block->insert(data);
+ }
+ if (++j < rowid_locs.size()) {
+ rowid_to_block_loc = rowid_locs[j];
+ }
+ }
+ }
+ origin_block.clear();
+ response_blocks.clear();
+
+ return Status::OK();
+}
+
+Dependency* MaterializationSharedState::create_source_dependency(int
operator_id, int node_id,
+ const
std::string& name) {
+ auto dep =
+ std::make_shared<CountedFinishDependency>(operator_id, node_id,
name + "_DEPENDENCY");
+ dep->set_shared_state(this);
+ // just block source wait for add the counter in sink
+ dep->add(0);
+
+ source_deps.push_back(dep);
+ return source_deps.back().get();
+}
+
+Status MaterializationSharedState::create_muiltget_result(const
vectorized::Columns& columns,
+ bool eos, bool
gc_id_map) {
+ const auto rows = columns.empty() ? 0 : columns[0]->size();
+ block_order_results.resize(columns.size());
+
+ for (int i = 0; i < columns.size(); ++i) {
+ const uint8_t* null_map = nullptr;
+ const vectorized::ColumnString* column_rowid = nullptr;
+ auto& column = columns[i];
+
+ if (auto column_ptr =
check_and_get_column<vectorized::ColumnNullable>(*column)) {
+ null_map = column_ptr->get_null_map_data().data();
+ column_rowid = assert_cast<const vectorized::ColumnString*>(
+ column_ptr->get_nested_column_ptr().get());
+ } else {
+ column_rowid = assert_cast<const
vectorized::ColumnString*>(column.get());
+ }
+
+ auto& block_order = block_order_results[i];
+ block_order.resize(rows);
+
+ for (int j = 0; j < rows; ++j) {
+ if (!null_map || !null_map[j]) {
+ DCHECK(column_rowid->get_data_at(j).size ==
sizeof(GlobalRowLoacationV2));
+ GlobalRowLoacationV2 row_location =
+
*((GlobalRowLoacationV2*)column_rowid->get_data_at(j).data);
+ auto rpc_struct = rpc_struct_map.find(row_location.backend_id);
+ if (UNLIKELY(rpc_struct == rpc_struct_map.end())) {
+ return Status::InternalError(
+ "MaterializationSinkOperatorX failed to find
rpc_struct, backend_id={}",
+ row_location.backend_id);
+ }
+
rpc_struct->second.request.mutable_request_block_descs(i)->add_row_id(
+ row_location.row_id);
+
rpc_struct->second.request.mutable_request_block_descs(i)->add_file_id(
+ row_location.file_id);
+ block_order[j] = row_location.backend_id;
+ } else {
+ block_order[j] = 0;
+ }
+ }
+ }
+
+ if (eos && gc_id_map) {
+ for (auto& [_, rpc_struct] : rpc_struct_map) {
+ rpc_struct.request.set_gc_id_map(true);
+ }
+ }
+ last_block = eos;
+ need_merge_block = rows > 0;
+
+ return Status::OK();
+}
+
+Status MaterializationSharedState::init_multi_requests(
+ const TMaterializationNode& materialization_node, RuntimeState* state)
{
+ PMultiGetRequestV2 multi_get_request;
+ // Initialize the base struct of PMultiGetRequestV2
+ multi_get_request.set_be_exec_version(state->be_exec_version());
+
multi_get_request.set_wg_id(state->get_query_ctx()->workload_group()->id());
+ auto query_id = multi_get_request.mutable_query_id();
+ query_id->set_hi(state->query_id().hi);
+ query_id->set_lo(state->query_id().lo);
+ DCHECK_EQ(materialization_node.column_descs_lists.size(),
+ materialization_node.slot_locs_lists.size());
+
+ const auto& slots = state->desc_tbl()
+
.get_tuple_descriptor(materialization_node.intermediate_tuple_id)
+ ->slots();
+ for (int i = 0; i < materialization_node.column_descs_lists.size(); ++i) {
+ auto request_block_desc = multi_get_request.add_request_block_descs();
+
request_block_desc->set_fetch_row_store(materialization_node.fetch_row_stores[i]);
+ // Initialize the column_descs and slot_locs
+ auto& column_descs = materialization_node.column_descs_lists[i];
+ for (auto& column_desc_item : column_descs) {
+
TabletColumn(column_desc_item).to_schema_pb(request_block_desc->add_column_descs());
+ }
+
+ auto& slot_locs = materialization_node.slot_locs_lists[i];
+ for (auto& slot_loc_item : slot_locs) {
+ slots[slot_loc_item]->to_protobuf(request_block_desc->add_slots());
+ }
+ }
+
+ // Initialize the stubs and requests for each BE
+ for (const auto& node_info : materialization_node.nodes_info.nodes) {
+ auto client =
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+ node_info.host, node_info.async_internal_port);
+ if (!client) {
+ LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
+ << ", port=" << node_info.async_internal_port;
+ return Status::InternalError("RowIDFetcher failed to init rpc
client, host={}, port={}",
+ node_info.host,
node_info.async_internal_port);
+ }
+ rpc_struct_map.emplace(node_info.id, FetchRpcStruct {.stub =
std::move(client),
+ .request =
multi_get_request,
+ .callback =
nullptr});
+ }
+ // add be_num ad count finish counter for source dependency
+
((CountedFinishDependency*)source_deps.back().get())->add((int)rpc_struct_map.size());
+
+ return Status::OK();
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index ea1928215b0..22f52f1ea77 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -29,12 +29,14 @@
#include "common/config.h"
#include "common/logging.h"
+#include "gen_cpp/internal_service.pb.h"
#include "gutil/integral_types.h"
#include "pipeline/common/agg_utils.h"
#include "pipeline/common/join_utils.h"
#include "pipeline/common/set_utils.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/join/process_hash_table_probe.h"
+#include "util/ref_count_closure.h"
#include "util/stack_util.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/common/sort/sorter.h"
@@ -83,7 +85,8 @@ struct BasicSharedState {
virtual ~BasicSharedState() = default;
- Dependency* create_source_dependency(int operator_id, int node_id, const
std::string& name);
+ virtual Dependency* create_source_dependency(int operator_id, int node_id,
+ const std::string& name);
Dependency* create_sink_dependency(int dest_id, int node_id, const
std::string& name);
};
@@ -173,12 +176,12 @@ public:
CountedFinishDependency(int id, int node_id, std::string name)
: Dependency(id, node_id, std::move(name), true) {}
- void add() {
+ void add(uint32_t count = 1) {
std::unique_lock<std::mutex> l(_mtx);
if (!_counter) {
block();
}
- _counter++;
+ _counter += count;
}
void sub() {
@@ -551,8 +554,8 @@ public:
const int _child_count;
};
-struct CacheSharedState : public BasicSharedState {
- ENABLE_FACTORY_CREATOR(CacheSharedState)
+struct DataQueueSharedState : public BasicSharedState {
+ ENABLE_FACTORY_CREATOR(DataQueueSharedState)
public:
DataQueue data_queue;
};
@@ -872,5 +875,39 @@ class QueryGlobalDependency final : public Dependency {
~QueryGlobalDependency() override = default;
Dependency* is_blocked_by(PipelineTask* task = nullptr) override;
};
+
+struct FetchRpcStruct {
+ std::shared_ptr<PBackendService_Stub> stub;
+ PMultiGetRequestV2 request;
+ std::shared_ptr<doris::DummyBrpcCallback<PMultiGetResponseV2>> callback;
+ MonotonicStopWatch rpc_timer;
+};
+
+struct MaterializationSharedState : public BasicSharedState {
+ ENABLE_FACTORY_CREATOR(MaterializationSharedState)
+public:
+ MaterializationSharedState() = default;
+
+ Status init_multi_requests(const TMaterializationNode& tnode,
RuntimeState* state);
+ Status create_muiltget_result(const vectorized::Columns& columns, bool
eos, bool gc_id_map);
+ Status merge_multi_response(vectorized::Block* block);
+
+ Dependency* create_source_dependency(int operator_id, int node_id,
+ const std::string& name) override;
+
+ bool rpc_struct_inited = false;
+ Status rpc_status = Status::OK();
+ bool last_block = false;
+ // empty materialization sink block not need to merge block
+ bool need_merge_block = true;
+ vectorized::Block origin_block;
+ // The rowid column of the origin block. should be replaced by the column
of the result block.
+ std::vector<int> rowid_locs;
+ std::vector<vectorized::MutableBlock> response_blocks;
+ std::map<int64_t, FetchRpcStruct> rpc_struct_map;
+ // Register each line in which block to ensure the order of the result.
+ // Zero means NULL value.
+ std::vector<std::vector<int64_t>> block_order_results;
+};
#include "common/compile_check_end.h"
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/cache_sink_operator.cpp
b/be/src/pipeline/exec/cache_sink_operator.cpp
index f1a8a2c888e..93d50212875 100644
--- a/be/src/pipeline/exec/cache_sink_operator.cpp
+++ b/be/src/pipeline/exec/cache_sink_operator.cpp
@@ -40,7 +40,6 @@ Status CacheSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));
- // auto& p = _parent->cast<Parent>();
_shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks());
return Status::OK();
diff --git a/be/src/pipeline/exec/cache_sink_operator.h
b/be/src/pipeline/exec/cache_sink_operator.h
index d4f2113ed06..c67ada5735c 100644
--- a/be/src/pipeline/exec/cache_sink_operator.h
+++ b/be/src/pipeline/exec/cache_sink_operator.h
@@ -33,14 +33,14 @@ namespace pipeline {
class DataQueue;
class CacheSinkOperatorX;
-class CacheSinkLocalState final : public
PipelineXSinkLocalState<CacheSharedState> {
+class CacheSinkLocalState final : public
PipelineXSinkLocalState<DataQueueSharedState> {
public:
ENABLE_FACTORY_CREATOR(CacheSinkLocalState);
CacheSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) :
Base(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
friend class CacheSinkOperatorX;
- using Base = PipelineXSinkLocalState<CacheSharedState>;
+ using Base = PipelineXSinkLocalState<DataQueueSharedState>;
using Parent = CacheSinkOperatorX;
};
@@ -59,7 +59,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
std::shared_ptr<BasicSharedState> create_shared_state() const override {
- std::shared_ptr<BasicSharedState> ss =
std::make_shared<CacheSharedState>();
+ std::shared_ptr<BasicSharedState> ss =
std::make_shared<DataQueueSharedState>();
ss->id = operator_id();
for (auto& dest : dests_id()) {
ss->related_op_ids.insert(dest);
diff --git a/be/src/pipeline/exec/cache_source_operator.cpp
b/be/src/pipeline/exec/cache_source_operator.cpp
index ec9f9ecc572..9db41c4cd85 100644
--- a/be/src/pipeline/exec/cache_source_operator.cpp
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -34,7 +34,7 @@ Status CacheSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
- ((CacheSharedState*)_dependency->shared_state())
+ ((DataQueueSharedState*)_dependency->shared_state())
->data_queue.set_source_dependency(_shared_state->source_deps.front());
const auto& scan_ranges = info.scan_ranges;
bool hit_cache = false;
diff --git a/be/src/pipeline/exec/cache_source_operator.h
b/be/src/pipeline/exec/cache_source_operator.h
index 651f9ff5596..ec95301cbea 100644
--- a/be/src/pipeline/exec/cache_source_operator.h
+++ b/be/src/pipeline/exec/cache_source_operator.h
@@ -36,10 +36,10 @@ namespace pipeline {
class DataQueue;
class CacheSourceOperatorX;
-class CacheSourceLocalState final : public
PipelineXLocalState<CacheSharedState> {
+class CacheSourceLocalState final : public
PipelineXLocalState<DataQueueSharedState> {
public:
ENABLE_FACTORY_CREATOR(CacheSourceLocalState);
- using Base = PipelineXLocalState<CacheSharedState>;
+ using Base = PipelineXLocalState<DataQueueSharedState>;
using Parent = CacheSourceOperatorX;
CacheSourceLocalState(RuntimeState* state, OperatorXBase* parent) :
Base(state, parent) {};
diff --git a/be/src/pipeline/exec/materialization_sink_operator.cpp
b/be/src/pipeline/exec/materialization_sink_operator.cpp
new file mode 100644
index 00000000000..acdae4c508b
--- /dev/null
+++ b/be/src/pipeline/exec/materialization_sink_operator.cpp
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/materialization_sink_operator.h"
+
+#include <bthread/countdown_event.h>
+#include <fmt/format.h>
+#include <gen_cpp/data.pb.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <gen_cpp/types.pb.h>
+
+#include <utility>
+
+#include "common/status.h"
+#include "pipeline/exec/operator.h"
+#include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
+
+namespace doris {
+namespace pipeline {
+
+Status MaterializationSinkOperatorX::init(const doris::TPlanNode& tnode,
+ doris::RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
+ DCHECK(tnode.__isset.materialization_node);
+ _materialization_node = tnode.materialization_node;
+ _gc_id_map = tnode.materialization_node.gc_id_map;
+ // Create result_expr_ctx_lists_ from thrift exprs.
+ auto& fetch_expr_lists = tnode.materialization_node.fetch_expr_lists;
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(fetch_expr_lists,
_rowid_exprs));
+ return Status::OK();
+}
+
+Status MaterializationSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_rowid_exprs, state,
_child->row_desc()));
+ RETURN_IF_ERROR(vectorized::VExpr::open(_rowid_exprs, state));
+ return Status::OK();
+}
+
+template <typename Response>
+class MaterializationCallback : public ::doris::DummyBrpcCallback<Response> {
+ ENABLE_FACTORY_CREATOR(MaterializationCallback);
+
+public:
+ MaterializationCallback(std::weak_ptr<TaskExecutionContext> tast_exec_ctx,
+ MaterializationSharedState* shared_state,
MonotonicStopWatch& rpc_timer)
+ : _tast_exec_ctx(std::move(tast_exec_ctx)),
+ _shared_state(shared_state),
+ _rpc_timer(rpc_timer) {}
+
+ ~MaterializationCallback() override = default;
+ MaterializationCallback(const MaterializationCallback& other) = delete;
+ MaterializationCallback& operator=(const MaterializationCallback& other) =
delete;
+
+ void call() noexcept override {
+ auto tast_exec_ctx = _tast_exec_ctx.lock();
+ if (!tast_exec_ctx) {
+ return;
+ }
+
+ _rpc_timer.stop();
+ if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) {
+ std::string err = fmt::format(
+ "failed to send brpc when exchange, error={},
error_text={}, client: {}, "
+ "latency = {}",
+
berror(::doris::DummyBrpcCallback<Response>::cntl_->ErrorCode()),
+ ::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(),
+ BackendOptions::get_localhost(),
+ ::doris::DummyBrpcCallback<Response>::cntl_->latency_us());
+ _shared_state->rpc_status = Status::InternalError(err);
+ } else {
+ _shared_state->rpc_status =
+
Status::create(doris::DummyBrpcCallback<Response>::response_->status());
+ }
+
((CountedFinishDependency*)_shared_state->source_deps.back().get())->sub();
+ }
+
+private:
+ std::weak_ptr<TaskExecutionContext> _tast_exec_ctx;
+ MaterializationSharedState* _shared_state;
+ MonotonicStopWatch& _rpc_timer;
+};
+
+Status MaterializationSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_block,
+ bool eos) {
+ auto& local_state = get_local_state(state);
+ SCOPED_TIMER(local_state.exec_time_counter());
+ COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
+ if (!local_state._shared_state->rpc_struct_inited) {
+ RETURN_IF_ERROR(
+
local_state._shared_state->init_multi_requests(_materialization_node, state));
+ }
+
+ if (in_block->rows() > 0 || eos) {
+ // block the pipeline wait the rpc response
+ if (!eos) {
+ local_state._shared_state->sink_deps.back()->block();
+ }
+ // execute the rowid exprs
+ vectorized::Columns columns;
+ if (in_block->rows() != 0) {
+ local_state._shared_state->rowid_locs.resize(_rowid_exprs.size());
+ for (int i = 0; i < _rowid_exprs.size(); ++i) {
+ auto& rowid_expr = _rowid_exprs[i];
+ RETURN_IF_ERROR(
+ rowid_expr->execute(in_block,
&local_state._shared_state->rowid_locs[i]));
+ columns.emplace_back(
+
in_block->get_by_position(local_state._shared_state->rowid_locs[i]).column);
+ }
+ local_state._shared_state->origin_block.swap(*in_block);
+ }
+ RETURN_IF_ERROR(
+ local_state._shared_state->create_muiltget_result(columns,
eos, _gc_id_map));
+
+ for (auto& [backend_id, rpc_struct] :
local_state._shared_state->rpc_struct_map) {
+ auto callback =
MaterializationCallback<PMultiGetResponseV2>::create_shared(
+ state->get_task_execution_context(),
local_state._shared_state,
+ rpc_struct.rpc_timer);
+ callback->cntl_->set_timeout_ms(config::fetch_rpc_timeout_seconds
* 1000);
+ auto closure =
+ AutoReleaseClosure<int,
::doris::DummyBrpcCallback<PMultiGetResponseV2>>::
+ create_unique(
+ std::make_shared<int>(), callback,
state->get_query_ctx_weak(),
+ "Materialization Sink node id:" +
std::to_string(node_id()) +
+ " target_backend_id:" +
std::to_string(backend_id));
+ // send brpc request
+ rpc_struct.callback = callback;
+ rpc_struct.rpc_timer.start();
+ rpc_struct.stub->multiget_data_v2(callback->cntl_.get(),
&rpc_struct.request,
+ callback->response_.get(),
closure.release());
+ }
+ }
+
+ return Status::OK();
+}
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/materialization_sink_operator.h
b/be/src/pipeline/exec/materialization_sink_operator.h
new file mode 100644
index 00000000000..813d12e017d
--- /dev/null
+++ b/be/src/pipeline/exec/materialization_sink_operator.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace pipeline {
+
+class MaterializationSinkOperatorX;
+class MaterializationSinkLocalState final
+ : public PipelineXSinkLocalState<MaterializationSharedState> {
+public:
+ ENABLE_FACTORY_CREATOR(MaterializationSinkLocalState);
+ MaterializationSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState*
state)
+ : Base(parent, state) {}
+
+private:
+ friend class MaterializationSinkOperatorX;
+ using Base = PipelineXSinkLocalState<MaterializationSharedState>;
+ using Parent = MaterializationSinkOperatorX;
+};
+
+class MaterializationSinkOperatorX final : public
DataSinkOperatorX<MaterializationSinkLocalState> {
+public:
+ using Base = DataSinkOperatorX<MaterializationSinkLocalState>;
+
+ friend class MaterializationSinkLocalState;
+ MaterializationSinkOperatorX(int child_id, int sink_id, ObjectPool* pool,
+ const TPlanNode& tnode)
+ : Base(sink_id, tnode.node_id, child_id) {
+ _name = "MATERIALIZATION_SINK_OPERATOR";
+ }
+ ~MaterializationSinkOperatorX() override = default;
+
+ Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
+ Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
+
+private:
+ // Materialized slot by this node. The i-th result expr list refers to a
slot of RowId
+ TMaterializationNode _materialization_node;
+ vectorized::VExprContextSPtrs _rowid_exprs;
+ bool _gc_id_map = false;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/materialization_source_operator.cpp
b/be/src/pipeline/exec/materialization_source_operator.cpp
new file mode 100644
index 00000000000..20e7c6351d9
--- /dev/null
+++ b/be/src/pipeline/exec/materialization_source_operator.cpp
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/materialization_source_operator.h"
+
+#include <utility>
+
+#include "common/status.h"
+#include "vec/core/block.h"
+
+namespace doris::pipeline {
+
+Status MaterializationSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
+ bool* eos) {
+ auto& local_state = get_local_state(state);
+ SCOPED_TIMER(local_state.exec_time_counter());
+
+ if (!local_state._shared_state->rpc_status.ok()) {
+ return local_state._shared_state->rpc_status;
+ }
+
+ // clear origin block, do merge response to build a ret block
+ block->clear();
+ if (local_state._shared_state->need_merge_block) {
+ SCOPED_TIMER(local_state._merge_response_timer);
+
RETURN_IF_ERROR(local_state._shared_state->merge_multi_response(block));
+ }
+
+ *eos = local_state._shared_state->last_block;
+ if (!*eos) {
+ local_state._shared_state->sink_deps.back()->ready();
+
((CountedFinishDependency*)(local_state._shared_state->source_deps.back().get()))
+ ->add(local_state._shared_state->rpc_struct_map.size());
+ } else {
+ uint64_t max_rpc_time = 0;
+ for (auto& [_, rpc_struct] :
local_state._shared_state->rpc_struct_map) {
+ max_rpc_time = std::max(max_rpc_time,
rpc_struct.rpc_timer.elapsed_time());
+ }
+ COUNTER_SET(local_state._max_rpc_timer, (int64_t)max_rpc_time);
+ }
+
+ return Status::OK();
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/materialization_source_operator.h
b/be/src/pipeline/exec/materialization_source_operator.h
new file mode 100644
index 00000000000..0c6a8b91047
--- /dev/null
+++ b/be/src/pipeline/exec/materialization_source_operator.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace pipeline {
+
+class MaterializationSourceOperatorX;
+class MaterializationSourceLocalState final
+ : public PipelineXLocalState<MaterializationSharedState> {
+public:
+ ENABLE_FACTORY_CREATOR(MaterializationSourceLocalState);
+ using Base = PipelineXLocalState<MaterializationSharedState>;
+ using Parent = MaterializationSourceOperatorX;
+ MaterializationSourceLocalState(RuntimeState* state, OperatorXBase* parent)
+ : Base(state, parent) {};
+
+ Status init(doris::RuntimeState* state, doris::pipeline::LocalStateInfo&
info) override {
+ RETURN_IF_ERROR(Base::init(state, info));
+ _max_rpc_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "MaxRpcTime",
2);
+ _merge_response_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile,
"MergeResponseTime", 2);
+ return Status::OK();
+ }
+
+private:
+ RuntimeProfile::Counter* _max_rpc_timer = nullptr;
+ RuntimeProfile::Counter* _merge_response_timer = nullptr;
+
+ friend class MaterializationSourceOperatorX;
+ friend class OperatorX<MaterializationSourceLocalState>;
+};
+
+class MaterializationSourceOperatorX final : public
OperatorX<MaterializationSourceLocalState> {
+public:
+ using Base = OperatorX<MaterializationSourceLocalState>;
+ MaterializationSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const int operator_id,
+ const DescriptorTbl& descs)
+ : Base(pool, tnode, operator_id, descs) {};
+ ~MaterializationSourceOperatorX() override = default;
+
+ Status get_block(doris::RuntimeState* state, vectorized::Block* block,
bool* eos) override;
+
+ bool is_source() const override { return true; }
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index d2d0d6a6827..a2f708392af 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -41,6 +41,8 @@
#include "pipeline/exec/iceberg_table_sink_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
+#include "pipeline/exec/materialization_sink_operator.h"
+#include "pipeline/exec/materialization_source_operator.h"
#include "pipeline/exec/memory_scratch_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/mock_operator.h"
@@ -706,6 +708,7 @@ DECLARE_OPERATOR(SetSinkLocalState<false>)
DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
DECLARE_OPERATOR(CacheSinkLocalState)
+DECLARE_OPERATOR(MaterializationSinkLocalState)
#undef DECLARE_OPERATOR
@@ -738,6 +741,7 @@ DECLARE_OPERATOR(MetaScanLocalState)
DECLARE_OPERATOR(LocalExchangeSourceLocalState)
DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
DECLARE_OPERATOR(CacheSourceLocalState)
+DECLARE_OPERATOR(MaterializationSourceLocalState)
#ifdef BE_TEST
DECLARE_OPERATOR(MockLocalState)
@@ -770,7 +774,7 @@ template class
PipelineXSinkLocalState<MultiCastSharedState>;
template class PipelineXSinkLocalState<SetSharedState>;
template class PipelineXSinkLocalState<LocalExchangeSharedState>;
template class PipelineXSinkLocalState<BasicSharedState>;
-template class PipelineXSinkLocalState<CacheSharedState>;
+template class PipelineXSinkLocalState<DataQueueSharedState>;
template class PipelineXLocalState<HashJoinSharedState>;
template class PipelineXLocalState<PartitionedHashJoinSharedState>;
@@ -782,7 +786,7 @@ template class PipelineXLocalState<AggSharedState>;
template class PipelineXLocalState<PartitionedAggSharedState>;
template class PipelineXLocalState<FakeSharedState>;
template class PipelineXLocalState<UnionSharedState>;
-template class PipelineXLocalState<CacheSharedState>;
+template class PipelineXLocalState<DataQueueSharedState>;
template class PipelineXLocalState<MultiCastSharedState>;
template class PipelineXLocalState<PartitionSortNodeSharedState>;
template class PipelineXLocalState<SetSharedState>;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index ec283f46683..c8b19e3e1fc 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -102,6 +102,15 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
}
RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
auto& p = _parent->cast<typename Derived::Parent>();
+
+ // init id_file_map() for runtime state
+ std::vector<SlotDescriptor*> slots = p._output_tuple_desc->slots();
+ for (auto slot : slots) {
+ if (slot->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+ state->set_id_file_map();
+ }
+ }
+
_common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) {
RETURN_IF_ERROR(
@@ -1002,6 +1011,7 @@ Status ScanLocalState<Derived>::_start_scanners(
// https://github.com/apache/doris/pull/44635
const int parallism_of_scan_operator =
p.is_serial_operator() ? 1 : p.query_parallel_instance_num();
+
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
_scan_dependency, parallism_of_scan_operator);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 388b3102a21..2d8160d3e1e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -63,6 +63,8 @@
#include "pipeline/exec/iceberg_table_sink_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
+#include "pipeline/exec/materialization_sink_operator.h"
+#include "pipeline/exec/materialization_source_operator.h"
#include "pipeline/exec/memory_scratch_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/multi_cast_data_stream_sink.h"
@@ -1584,6 +1586,25 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
break;
}
+ case TPlanNodeType::MATERIALIZATION_NODE: {
+ op.reset(new MaterializationSourceOperatorX(pool, tnode,
next_operator_id(), descs));
+ RETURN_IF_ERROR(cur_pipe->add_operator(
+ op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
+
+ const auto downstream_pipeline_id = cur_pipe->id();
+ if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+ _dag.insert({downstream_pipeline_id, {}});
+ }
+ auto new_pipe = add_pipeline(cur_pipe);
+ _dag[downstream_pipeline_id].push_back(new_pipe->id());
+
+ DataSinkOperatorPtr sink(new MaterializationSinkOperatorX(
+ op->operator_id(), next_sink_operator_id(), pool, tnode));
+ RETURN_IF_ERROR(new_pipe->set_sink(sink));
+ RETURN_IF_ERROR(new_pipe->sink()->init(tnode, _runtime_state.get()));
+ cur_pipe = new_pipe;
+ break;
+ }
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
request));
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 1fefd9de642..b760e13f7c3 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -115,6 +115,7 @@ class LookupConnectionCache;
class RowCache;
class DummyLRUCache;
class CacheManager;
+class IdManager;
class ProcessProfile;
class HeapProfiler;
class WalManager;
@@ -330,6 +331,7 @@ public:
LookupConnectionCache* get_lookup_connection_cache() { return
_lookup_connection_cache; }
RowCache* get_row_cache() { return _row_cache; }
CacheManager* get_cache_manager() { return _cache_manager; }
+ IdManager* get_id_manager() { return _id_manager; }
ProcessProfile* get_process_profile() { return _process_profile; }
HeapProfiler* get_heap_profiler() { return _heap_profiler; }
segment_v2::InvertedIndexSearcherCache*
get_inverted_index_searcher_cache() {
@@ -476,6 +478,7 @@ private:
LookupConnectionCache* _lookup_connection_cache = nullptr;
RowCache* _row_cache = nullptr;
CacheManager* _cache_manager = nullptr;
+ IdManager* _id_manager = nullptr;
ProcessProfile* _process_profile = nullptr;
HeapProfiler* _heap_profiler = nullptr;
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache =
nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 44aee2fde91..1f1ca3284ff 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -48,6 +48,7 @@
#include "io/cache/fs_file_cache_storage.h"
#include "io/fs/file_meta_cache.h"
#include "io/fs/local_file_reader.h"
+#include "olap/id_manager.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
@@ -457,6 +458,7 @@ Status ExecEnv::_init_mem_env() {
return Status::InternalError(ss.str());
}
+ _id_manager = new IdManager();
_cache_manager = CacheManager::create_global_instance();
int64_t storage_cache_limit =
@@ -801,6 +803,7 @@ void ExecEnv::destroy() {
// cache_manager must be destoried after all cache.
// https://github.com/apache/doris/issues/24082#issuecomment-1712544039
SAFE_DELETE(_cache_manager);
+ SAFE_DELETE(_id_manager);
// _heartbeat_flags must be destoried after staroge engine
SAFE_DELETE(_heartbeat_flags);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index fc3d80cdbe0..14d2c5c26f2 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -32,7 +32,6 @@
#include "common/config.h"
#include "common/factory_creator.h"
#include "common/object_pool.h"
-#include "pipeline/dependency.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/runtime_filter_mgr.h"
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 712e18159fd..d3b42f77d7d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -36,6 +36,7 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "io/fs/s3_file_system.h"
+#include "olap/id_manager.h"
#include "olap/storage_engine.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"
@@ -553,4 +554,8 @@ bool RuntimeState::low_memory_mode() const {
return _query_ctx->low_memory_mode();
}
+void RuntimeState::set_id_file_map() {
+ _id_file_map = _exec_env->get_id_manager()->add_id_file_map(_query_id,
execution_timeout());
+}
+
} // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index aa486d3b8b6..17d4126702c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -70,6 +70,7 @@ class Dependency;
class DescriptorTbl;
class ObjectPool;
class ExecEnv;
+class IdFileMap;
class RuntimeFilterMgr;
class MemTrackerLimiter;
class QueryContext;
@@ -661,6 +662,10 @@ public:
int profile_level() const { return _profile_level; }
+ std::shared_ptr<IdFileMap>& get_id_file_map() { return _id_file_map; }
+
+ void set_id_file_map();
+
private:
Status create_error_log_file();
@@ -784,6 +789,9 @@ private:
// error file path on s3,
${bucket}/${prefix}/error_log/${label}_${fragment_instance_id}
std::string _s3_error_log_file_path;
std::mutex _s3_error_log_file_lock;
+
+ // used for encoding the global lazy materialize
+ std::shared_ptr<IdFileMap> _id_file_map = nullptr;
};
#define RETURN_IF_CANCELLED(state) \
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 5f5b944f8e6..9d9e509c5cc 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -227,6 +227,11 @@ public:
int64_t free_overcommited_memory(int64_t need_free_mem, RuntimeProfile*
profile);
+ vectorized::SimplifiedScanScheduler* get_remote_scan_task_scheduler() {
+ std::shared_lock<std::shared_mutex> r_lock(_mutex);
+ return _remote_scan_task_sched.get();
+ }
+
private:
void create_cgroup_cpu_ctl_no_lock();
void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
diff --git a/be/src/service/backend_options.h b/be/src/service/backend_options.h
index 0052eb41530..543863a6309 100644
--- a/be/src/service/backend_options.h
+++ b/be/src/service/backend_options.h
@@ -37,6 +37,7 @@ public:
static std::string get_be_endpoint();
static TBackend get_local_backend();
static void set_backend_id(int64_t backend_id);
+ static int64_t get_backend_id() { return _s_backend_id; }
static void set_localhost(const std::string& host);
static bool is_bind_ipv6();
static const char* get_service_bind_address();
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 19a0e88fe8f..b2029f0870d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -105,6 +105,8 @@
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
+#include "runtime/workload_group/workload_group.h"
+#include "runtime/workload_group/workload_group_manager.h"
#include "service/backend_options.h"
#include "service/point_query_executor.h"
#include "util/arrow/row_batch.h"
@@ -2056,6 +2058,42 @@ void
PInternalService::multiget_data(google::protobuf::RpcController* controller
}
}
+void PInternalService::multiget_data_v2(google::protobuf::RpcController*
controller,
+ const PMultiGetRequestV2* request,
+ PMultiGetResponseV2* response,
+ google::protobuf::Closure* done) {
+ auto wg =
ExecEnv::GetInstance()->workload_group_mgr()->get_group(request->wg_id());
+ Status st = Status::OK();
+
+ if (!wg) [[unlikely]] {
+ brpc::ClosureGuard closure_guard(done);
+ st = Status::Error<TStatusCode::CANCELLED>("fail to find wg: wg id:" +
+
std::to_string(request->wg_id()));
+ st.to_protobuf(response->mutable_status());
+ return;
+ }
+
+ st =
wg->get_remote_scan_task_scheduler()->submit_scan_task(vectorized::SimplifiedScanTask(
+ [request, response, done]() {
+ signal::set_signal_task_id(request->query_id());
+ // multi get data by rowid
+ MonotonicStopWatch watch;
+ watch.start();
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(0);
+
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->rowid_storage_reader_tracker());
+ Status st = RowIdStorageReader::read_by_rowids(*request,
response);
+ st.to_protobuf(response->mutable_status());
+ LOG(INFO) << "multiget_data finished, cost(us):" <<
watch.elapsed_time() / 1000;
+ },
+ nullptr));
+
+ if (!st.ok()) {
+ brpc::ClosureGuard closure_guard(done);
+ st.to_protobuf(response->mutable_status());
+ }
+}
+
void
PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcController*
cntl_base,
const
PGetTabletVersionsRequest* request,
PGetTabletVersionsResponse* response,
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index e3d03a6a449..6ce0ae868c1 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -211,6 +211,10 @@ public:
void multiget_data(google::protobuf::RpcController* controller, const
PMultiGetRequest* request,
PMultiGetResponse* response, google::protobuf::Closure*
done) override;
+ void multiget_data_v2(google::protobuf::RpcController* controller,
+ const PMultiGetRequestV2* request,
PMultiGetResponseV2* response,
+ google::protobuf::Closure* done) override;
+
void tablet_fetch_data(google::protobuf::RpcController* controller,
const PTabletKeyLookupRequest* request,
PTabletKeyLookupResponse* response,
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index 560aebb98ee..c1be06c691d 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -25,7 +25,6 @@
#include "runtime/query_context.h"
#include "runtime/thread_context.h"
#include "service/brpc.h"
-#include "util/ref_count_closure.h"
namespace doris {
@@ -82,7 +81,7 @@ class AutoReleaseClosure : public google::protobuf::Closure {
public:
AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback>
callback,
- std::weak_ptr<QueryContext> context = {})
+ std::weak_ptr<QueryContext> context = {},
std::string_view error_msg = {})
: request_(req), callback_(callback), context_(std::move(context))
{
this->cntl_ = callback->cntl_;
this->response_ = callback->response_;
@@ -113,10 +112,12 @@ public:
// at any stage.
std::shared_ptr<Request> request_;
std::shared_ptr<ResponseType> response_;
+ std::string error_msg_;
protected:
virtual void _process_if_rpc_failed() {
- std::string error_msg = "RPC meet failed: " + cntl_->ErrorText();
+ std::string error_msg =
+ fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(),
error_msg_);
if (auto ctx = context_.lock(); ctx) {
ctx->cancel(Status::NetworkError(error_msg));
} else {
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 50a780e4355..9eb38675bb5 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -718,7 +718,6 @@ using ColumnPtr = IColumn::Ptr;
using MutableColumnPtr = IColumn::MutablePtr;
using Columns = std::vector<ColumnPtr>;
using MutableColumns = std::vector<MutableColumnPtr>;
-using ColumnPtrs = std::vector<ColumnPtr>;
using ColumnRawPtrs = std::vector<const IColumn*>;
template <typename... Args>
diff --git a/be/src/vec/common/string_ref.cpp b/be/src/vec/common/string_ref.cpp
index 413c0338c10..e113694f34c 100644
--- a/be/src/vec/common/string_ref.cpp
+++ b/be/src/vec/common/string_ref.cpp
@@ -69,11 +69,14 @@ bool StringRef::end_with(char ch) const {
}
bool StringRef::start_with(const StringRef& search_string) const {
- DCHECK(size >= search_string.size);
if (search_string.size == 0) {
return true;
}
+ if (UNLIKELY(size < search_string.size)) {
+ return false;
+ }
+
#if defined(__SSE2__) || defined(__aarch64__)
return memequalSSE2Wide(data, search_string.data, search_string.size);
#else
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 6bae4b3319e..344136bbc28 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -41,6 +41,7 @@
#include "exprs/function_filter.h"
#include "io/cache/block_file_cache_profile.h"
#include "io/io_common.h"
+#include "olap/id_manager.h"
#include "olap/inverted_index_profile.h"
#include "olap/olap_common.h"
#include "olap/olap_tuple.h"
@@ -410,6 +411,13 @@ Status NewOlapScanner::_init_tablet_reader_params(
}
}
+ if (tablet_schema->has_global_row_id()) {
+ auto& id_file_map = _state->get_id_file_map();
+ for (auto rs_reader : _tablet_reader_params.rs_splits) {
+ id_file_map->add_temp_rowset(rs_reader.rs_reader->rowset());
+ }
+ }
+
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index fe0c7315c5d..11819d1d75d 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -345,8 +345,9 @@ void VFileScanner::_get_slot_ids(VExpr* expr,
std::vector<int>* slot_ids) {
if (child_expr->is_slot_ref()) {
VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
slot_ids->emplace_back(slot_ref->slot_id());
+ } else {
+ _get_slot_ids(child_expr.get(), slot_ids);
}
- _get_slot_ids(child_expr.get(), slot_ids);
}
}
diff --git a/be/test/exec/hash_map/hash_table_method_test.cpp
b/be/test/exec/hash_map/hash_table_method_test.cpp
index 31a18421123..5ca6a6cca36 100644
--- a/be/test/exec/hash_map/hash_table_method_test.cpp
+++ b/be/test/exec/hash_map/hash_table_method_test.cpp
@@ -27,7 +27,7 @@
namespace doris::vectorized {
template <typename HashMethodType>
-void test_insert(HashMethodType& method, ColumnPtrs column) {
+void test_insert(HashMethodType& method, Columns column) {
using State = typename HashMethodType::State;
ColumnRawPtrs key_raw_columns;
for (auto column : column) {
@@ -49,8 +49,7 @@ void test_insert(HashMethodType& method, ColumnPtrs column) {
}
template <typename HashMethodType>
-void test_find(HashMethodType& method, ColumnPtrs column,
- const std::vector<int64_t>& except_result) {
+void test_find(HashMethodType& method, Columns column, const
std::vector<int64_t>& except_result) {
using State = typename HashMethodType::State;
ColumnRawPtrs key_raw_columns;
for (auto column : column) {
diff --git a/be/test/olap/id_manager_test.cpp b/be/test/olap/id_manager_test.cpp
new file mode 100644
index 00000000000..12488b163f7
--- /dev/null
+++ b/be/test/olap/id_manager_test.cpp
@@ -0,0 +1,107 @@
+#include "olap/id_manager.h"
+
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <memory>
+#include <thread>
+#include <vector>
+
+#include "olap/olap_common.h"
+
+using namespace doris;
+
+TEST(IdFileMapTest, BasicOperations) {
+ IdFileMap id_file_map(1024);
+
+ // Test adding a file mapping
+ auto mapping1 =
+ std::make_shared<FileMapping>(FileMapping
{FileMappingType::DORIS_FORMAT, "file1"});
+ uint32_t id1 = id_file_map.get_file_mapping_id(mapping1);
+ EXPECT_EQ(id1, 0);
+
+ auto mapping2 = std::make_shared<FileMapping>(FileMapping
{FileMappingType::ORC, "file2"});
+ uint32_t id2 = id_file_map.get_file_mapping_id(mapping2);
+ EXPECT_EQ(id2, 1);
+
+ // Test getting a file mapping
+ auto retrieved_mapping1 = id_file_map.get_file_mapping(id1);
+ EXPECT_EQ(retrieved_mapping1->type, FileMappingType::DORIS_FORMAT);
+ EXPECT_EQ(retrieved_mapping1->value, "file1");
+
+ auto retrieved_mapping2 = id_file_map.get_file_mapping(id2);
+ EXPECT_EQ(retrieved_mapping2->type, FileMappingType::ORC);
+ EXPECT_EQ(retrieved_mapping2->value, "file2");
+
+ // Test getting a non-existent file mapping
+ auto retrieved_mapping3 = id_file_map.get_file_mapping(999);
+ EXPECT_EQ(retrieved_mapping3, nullptr);
+}
+
+TEST(IdFileMapTest, ConcurrentAddAndGet) {
+ IdFileMap id_file_map(1024);
+ std::vector<std::thread> threads;
+ std::atomic<int> counter(0);
+
+ for (int i = 0; i < 10; ++i) {
+ threads.emplace_back([&]() {
+ for (int j = 0; j < 100; ++j) {
+ auto mapping = std::make_shared<FileMapping>(FileMapping {
+ FileMappingType::DORIS_FORMAT, "file" +
std::to_string(counter++)});
+ uint32_t id = id_file_map.get_file_mapping_id(mapping);
+ auto retrieved_mapping = id_file_map.get_file_mapping(id);
+ EXPECT_EQ(retrieved_mapping->type, mapping->type);
+ EXPECT_EQ(retrieved_mapping->value, mapping->value);
+ }
+ });
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
+
+TEST(IdManagerTest, BasicOperations) {
+ IdManager id_manager;
+
+ // Test adding an IdFileMap
+ UniqueId query_id1 = UniqueId::gen_uid();
+ auto id_file_map1 = id_manager.add_id_file_map(query_id1, 1024);
+ EXPECT_NE(id_file_map1, nullptr);
+
+ UniqueId query_id2 = UniqueId::gen_uid();
+ auto id_file_map2 = id_manager.add_id_file_map(query_id2, 1024);
+ EXPECT_NE(id_file_map2, nullptr);
+
+ // Test getting an existing IdFileMap
+ auto retrieved_id_file_map1 = id_manager.add_id_file_map(query_id1, 1024);
+ EXPECT_EQ(retrieved_id_file_map1, id_file_map1);
+
+ // Test removing an IdFileMap
+ id_manager.remove_id_file_map(query_id1);
+ auto retrieved_id_file_map2 = id_manager.add_id_file_map(query_id1, 1024);
+ EXPECT_NE(retrieved_id_file_map2, id_file_map1);
+}
+
+TEST(IdManagerTest, ConcurrentAddAndRemove) {
+ IdManager id_manager;
+ std::vector<std::thread> threads;
+
+ for (int i = 0; i < 10; ++i) {
+ threads.emplace_back([&]() {
+ for (int j = 0; j < 10; ++j) {
+ UniqueId query_id = UniqueId::gen_uid();
+ auto id_file_map = id_manager.add_id_file_map(query_id, 1024);
+ EXPECT_NE(id_file_map, nullptr);
+
+ id_manager.remove_id_file_map(query_id);
+ auto retrieved_id_file_map =
id_manager.add_id_file_map(query_id, 1024);
+ EXPECT_NE(retrieved_id_file_map, id_file_map);
+ }
+ });
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
diff --git a/be/test/pipeline/operator/materialization_shared_state_test.cpp
b/be/test/pipeline/operator/materialization_shared_state_test.cpp
new file mode 100644
index 00000000000..6d3a6f28ffa
--- /dev/null
+++ b/be/test/pipeline/operator/materialization_shared_state_test.cpp
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "pipeline/dependency.h"
+#include "vec/columns/column_vector.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+
+namespace doris::pipeline {
+
+class MaterializationSharedStateTest : public testing::Test {
+protected:
+ void SetUp() override {
+ _shared_state = std::make_shared<MaterializationSharedState>();
+
+ // Setup test data types
+ _string_type = std::make_shared<vectorized::DataTypeString>();
+ _int_type = std::make_shared<vectorized::DataTypeInt32>();
+
+ // Create origin block with rowid column (ColumnString type)
+ _shared_state->origin_block = vectorized::Block();
+ _shared_state->origin_block.insert({_string_type->create_column(),
_string_type, "rowid"});
+ _shared_state->origin_block.insert({_int_type->create_column(),
_int_type, "value"});
+
+ // Add rowid location
+ _shared_state->rowid_locs = {0}; // First column is rowid
+
+ // Setup RPC structs for two backends
+ _backend_id1 = 1001;
+ _backend_id2 = 1002;
+ _shared_state->rpc_struct_map[_backend_id1] = FetchRpcStruct();
+ _shared_state->rpc_struct_map[_backend_id2] = FetchRpcStruct();
+
_shared_state->rpc_struct_map[_backend_id1].request.add_request_block_descs();
+
_shared_state->rpc_struct_map[_backend_id2].request.add_request_block_descs();
+ }
+
+ std::shared_ptr<MaterializationSharedState> _shared_state;
+ std::shared_ptr<vectorized::DataTypeString> _string_type;
+ std::shared_ptr<vectorized::DataTypeInt32> _int_type;
+ int64_t _backend_id1;
+ int64_t _backend_id2;
+};
+
+TEST_F(MaterializationSharedStateTest, TestCreateSourceDependency) {
+ // Test creating source dependencies
+ int test_op_id = 100;
+ int test_node_id = 200;
+ std::string test_name = "TEST";
+
+ auto* dep = _shared_state->create_source_dependency(test_op_id,
test_node_id, test_name);
+
+ // Verify the dependency was created correctly
+ ASSERT_NE(dep, nullptr);
+ EXPECT_EQ(dep->id(), test_op_id);
+ EXPECT_EQ(dep->name(), test_name + "_DEPENDENCY");
+
+ // Verify it was added to source_deps
+ EXPECT_EQ(_shared_state->source_deps.size(), 1);
+ EXPECT_EQ(_shared_state->source_deps[0].get(), dep);
+}
+
+TEST_F(MaterializationSharedStateTest, TestCreateMultiGetResult) {
+ // Create test columns for rowids
+ vectorized::Columns columns;
+ auto rowid_col = _string_type->create_column();
+ auto* col_data =
reinterpret_cast<vectorized::ColumnString*>(rowid_col.get());
+
+ // Create test GlobalRowLoacationV2 data
+ GlobalRowLoacationV2 loc1(0, _backend_id1, 1, 1);
+ GlobalRowLoacationV2 loc2(0, _backend_id2, 2, 2);
+
+ col_data->insert_data(reinterpret_cast<const char*>(&loc1),
sizeof(GlobalRowLoacationV2));
+ col_data->insert_data(reinterpret_cast<const char*>(&loc2),
sizeof(GlobalRowLoacationV2));
+ columns.push_back(std::move(rowid_col));
+
+ // Test creating multiget result
+ Status st = _shared_state->create_muiltget_result(columns, true, true);
+ EXPECT_TRUE(st.ok());
+
+ // Verify block_order_results
+ EXPECT_EQ(_shared_state->block_order_results.size(), columns.size());
+ EXPECT_EQ(_shared_state->last_block, true);
+}
+
+TEST_F(MaterializationSharedStateTest, TestMergeMultiResponse) {
+ // 1. Setup origin block with nullable rowid column
+ auto nullable_rowid_col =
vectorized::ColumnNullable::create(_string_type->create_column(),
+
vectorized::ColumnUInt8::create());
+ nullable_rowid_col->insert_data((char*)&nullable_rowid_col, 4);
+ nullable_rowid_col->insert_data(nullptr, 4);
+ nullable_rowid_col->insert_data((char*)&nullable_rowid_col, 4);
+
+ auto value_col = _int_type->create_column();
+ value_col->insert(100);
+ value_col->insert(101);
+ value_col->insert(200);
+
+ // Add test data to origin block
+ _shared_state->origin_block = vectorized::Block(
+ {{std::move(nullable_rowid_col),
vectorized::make_nullable(_string_type), "rowid"},
+ {std::move(value_col), _int_type, "value"}});
+
+ // Set rowid column location
+ _shared_state->rowid_locs = {0};
+
+ // 2. Setup response blocks from multiple backends
+ // Backend 1's response
+ {
+ vectorized::Block resp_block1;
+ auto resp_value_col1 = _int_type->create_column();
+ auto* value_col_data1 =
+
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col1.get());
+ value_col_data1->insert(100);
+ value_col_data1->insert(101);
+ resp_block1.insert(
+ {make_nullable(std::move(resp_value_col1)),
make_nullable(_int_type), "value"});
+
+ auto callback1 =
std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>();
+ callback1->response_.reset(new PMultiGetResponseV2());
+ auto serialized_block =
callback1->response_->add_blocks()->mutable_block();
+ size_t uncompressed_size = 0;
+ size_t compressed_size = 0;
+ auto s = resp_block1.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
+ CompressionTypePB::LZ4);
+ EXPECT_TRUE(s.ok());
+
+ _shared_state->rpc_struct_map[_backend_id1].callback = callback1;
+ }
+
+ // Backend 2's response
+ {
+ vectorized::Block resp_block2;
+ auto resp_value_col2 = _int_type->create_column();
+ auto* value_col_data2 =
+
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col2.get());
+ value_col_data2->insert(200);
+ resp_block2.insert(
+ {make_nullable(std::move(resp_value_col2)),
make_nullable(_int_type), "value"});
+
+ auto callback2 =
std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>();
+ callback2->response_.reset(new PMultiGetResponseV2());
+ auto serialized_block =
callback2->response_->add_blocks()->mutable_block();
+
+ size_t uncompressed_size = 0;
+ size_t compressed_size = 0;
+ auto s = resp_block2.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
+ CompressionTypePB::LZ4);
+ EXPECT_TRUE(s.ok());
+
+ _shared_state->rpc_struct_map[_backend_id2].callback = callback2;
+ }
+
+ // 3. Setup block order results to control merge order
+ _shared_state->block_order_results = {
+ {_backend_id1, 0, _backend_id2} // First block order: BE1,BE1,BE2
+ };
+
+ // 4. Test merging responses
+ vectorized::Block result_block;
+ Status st = _shared_state->merge_multi_response(&result_block);
+ EXPECT_TRUE(st.ok());
+
+ // 5. Verify merged result
+ EXPECT_EQ(result_block.columns(), 2); // Should have original rowid column
and value column
+ EXPECT_EQ(result_block.rows(), 3); // Total 3 rows from both backends
+
+ // Verify the value column data is merged in correct order
+ auto* merged_value_col = result_block.get_by_position(0).column.get();
+ EXPECT_EQ(*((int*)merged_value_col->get_data_at(0).data), 100); // First
value from BE1
+ EXPECT_EQ(merged_value_col->get_data_at(1).data,
+ nullptr); // Second value from BE1, replace by null
+ EXPECT_EQ(*((int*)merged_value_col->get_data_at(2).data), 200); // Third
value from BE2
+}
+
+TEST_F(MaterializationSharedStateTest, TestMergeMultiResponseMultiBlocks) {
+ // 1. Setup origin block with multiple nullable rowid columns
+ auto nullable_rowid_col1 = vectorized::ColumnNullable::create(
+ _string_type->create_column(), vectorized::ColumnUInt8::create());
+ nullable_rowid_col1->insert_data((char*)&nullable_rowid_col1, 4);
+ nullable_rowid_col1->insert_data(nullptr, 4);
+ nullable_rowid_col1->insert_data((char*)&nullable_rowid_col1, 4);
+
+ auto nullable_rowid_col2 = vectorized::ColumnNullable::create(
+ _string_type->create_column(), vectorized::ColumnUInt8::create());
+ nullable_rowid_col2->insert_data((char*)&nullable_rowid_col2, 4);
+ nullable_rowid_col2->insert_data((char*)&nullable_rowid_col2, 4);
+ nullable_rowid_col2->insert_data(nullptr, 4);
+
+ auto value_col1 = _int_type->create_column();
+ value_col1->insert(100);
+ value_col1->insert(101);
+ value_col1->insert(102);
+
+ auto value_col2 = _int_type->create_column();
+ value_col2->insert(200);
+ value_col2->insert(201);
+ value_col2->insert(202);
+
+ // Add test data to origin block with multiple columns
+ _shared_state->origin_block = vectorized::Block(
+ {{std::move(nullable_rowid_col1),
vectorized::make_nullable(_string_type), "rowid1"},
+ {std::move(nullable_rowid_col2),
vectorized::make_nullable(_string_type), "rowid2"},
+ {std::move(value_col1), _int_type, "value1"},
+ {std::move(value_col2), _int_type, "value2"}});
+
+ // Set multiple rowid column locations
+ _shared_state->rowid_locs = {0, 1};
+
+ // 2. Setup response blocks from multiple backends for first rowid
+ {
+ vectorized::Block resp_block1;
+ auto resp_value_col1 = _int_type->create_column();
+ auto* value_col_data1 =
+
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col1.get());
+ value_col_data1->insert(100);
+ resp_block1.insert(
+ {make_nullable(std::move(resp_value_col1)),
make_nullable(_int_type), "value1"});
+
+ auto callback1 =
std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>();
+ callback1->response_.reset(new PMultiGetResponseV2());
+ auto serialized_block =
callback1->response_->add_blocks()->mutable_block();
+ size_t uncompressed_size = 0;
+ size_t compressed_size = 0;
+ auto s = resp_block1.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
+ CompressionTypePB::LZ4);
+ EXPECT_TRUE(s.ok());
+
+ _shared_state->rpc_struct_map[_backend_id1].callback = callback1;
+ }
+
+ // Backend 2's response for first rowid
+ {
+ vectorized::Block resp_block2;
+ auto resp_value_col2 = _int_type->create_column();
+ auto* value_col_data2 =
+
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col2.get());
+ value_col_data2->insert(102);
+ resp_block2.insert(
+ {make_nullable(std::move(resp_value_col2)),
make_nullable(_int_type), "value1"});
+
+ auto callback2 =
std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>();
+ callback2->response_.reset(new PMultiGetResponseV2());
+ auto serialized_block =
callback2->response_->add_blocks()->mutable_block();
+ size_t uncompressed_size = 0;
+ size_t compressed_size = 0;
+ auto s = resp_block2.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
+ CompressionTypePB::LZ4);
+ EXPECT_TRUE(s.ok());
+
+ _shared_state->rpc_struct_map[_backend_id2].callback = callback2;
+ }
+
+ // Add second block responses for second rowid
+ {
+ vectorized::Block resp_block1;
+ auto resp_value_col1 = _int_type->create_column();
+ auto* value_col_data1 =
+
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col1.get());
+ value_col_data1->insert(200);
+ resp_block1.insert(
+ {make_nullable(std::move(resp_value_col1)),
make_nullable(_int_type), "value2"});
+
+ auto serialized_block = _shared_state->rpc_struct_map[_backend_id1]
+ .callback->response_->add_blocks()
+ ->mutable_block();
+ size_t uncompressed_size = 0;
+ size_t compressed_size = 0;
+ auto s = resp_block1.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
+ CompressionTypePB::LZ4);
+ EXPECT_TRUE(s.ok());
+ }
+
+ {
+ vectorized::Block resp_block2;
+ auto resp_value_col2 = _int_type->create_column();
+ auto* value_col_data2 =
+
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col2.get());
+ value_col_data2->insert(201);
+ resp_block2.insert(
+ {make_nullable(std::move(resp_value_col2)),
make_nullable(_int_type), "value2"});
+
+ auto serialized_block = _shared_state->rpc_struct_map[_backend_id2]
+ .callback->response_->add_blocks()
+ ->mutable_block();
+ size_t uncompressed_size = 0;
+ size_t compressed_size = 0;
+ auto s = resp_block2.serialize(0, serialized_block,
&uncompressed_size, &compressed_size,
+ CompressionTypePB::LZ4);
+ EXPECT_TRUE(s.ok());
+ }
+
+ // 3. Setup block order results for both rowids
+ _shared_state->block_order_results = {
+ {_backend_id1, 0, _backend_id2}, // First block order: BE1,null,BE2
+ {_backend_id1, _backend_id2, 0} // Second block order:
BE1,BE2,null
+ };
+
+ // 4. Test merging responses
+ vectorized::Block result_block;
+ Status st = _shared_state->merge_multi_response(&result_block);
+ EXPECT_TRUE(st.ok());
+
+ // 5. Verify merged result
+ EXPECT_EQ(result_block.columns(), 4); // Should have two rowid columns and
two value columns
+ EXPECT_EQ(result_block.rows(), 3); // Total 3 rows from both backends
+
+ // Verify the first value column data is merged in correct order
+ auto* merged_value_col1 = result_block.get_by_position(0).column.get();
+ EXPECT_EQ(*((int*)merged_value_col1->get_data_at(0).data), 100);
+ EXPECT_EQ(merged_value_col1->get_data_at(1).data, nullptr);
+ EXPECT_EQ(*((int*)merged_value_col1->get_data_at(2).data), 102);
+
+ // Verify the second value column data is merged in correct order
+ auto* merged_value_col2 = result_block.get_by_position(1).column.get();
+ EXPECT_EQ(*((int*)merged_value_col2->get_data_at(0).data), 200);
+ EXPECT_EQ(*((int*)merged_value_col2->get_data_at(1).data), 201);
+ EXPECT_EQ(merged_value_col2->get_data_at(2).data, nullptr);
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 837d3f4a941..df406115c14 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -781,6 +781,40 @@ message PMultiGetResponse {
repeated PRowLocation row_locs = 5;
};
+// Eeach block have own schema to read
+message PRequestBlockDesc {
+ optional bool fetch_row_store = 1;
+ repeated PSlotDescriptor slots = 2;
+ repeated ColumnPB column_descs = 3;
+ repeated uint32 file_id = 4;
+ repeated uint32 row_id = 5;
+}
+
+message PMultiGetRequestV2 {
+ repeated PRequestBlockDesc request_block_descs = 1;
+
+ // for compability
+ optional int32 be_exec_version = 2;
+ optional PUniqueId query_id = 3;
+ optional bool gc_id_map = 4;
+ optional uint64 wg_id = 5;
+};
+
+message PMultiGetBlockV2 {
+ optional PBlock block = 1;
+ // more effecient serialization fields for row store
+ enum RowFormat {
+ JSONB = 0;
+ };
+ optional RowFormat format = 2;
+ repeated bytes binary_row_data = 3;
+}
+
+message PMultiGetResponseV2 {
+ optional PStatus status = 1;
+ repeated PMultiGetBlockV2 blocks = 2;
+};
+
message PFetchColIdsRequest {
message PFetchColIdParam {
required int64 indexId = 1;
@@ -1026,6 +1060,7 @@ service PBackendService {
rpc outfile_write_success(POutfileWriteSuccessRequest) returns
(POutfileWriteSuccessResult);
rpc fetch_table_schema(PFetchTableSchemaRequest) returns
(PFetchTableSchemaResult);
rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse);
+ rpc multiget_data_v2(PMultiGetRequestV2) returns (PMultiGetResponseV2);
rpc get_file_cache_meta_by_tablet_id(PGetFileCacheMetaRequest) returns
(PGetFileCacheMetaResponse);
rpc tablet_fetch_data(PTabletKeyLookupRequest) returns
(PTabletKeyLookupResponse);
rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns
(PFetchColIdsResponse);
diff --git a/tools/clickbench-tools/conf/doris-cluster.conf
b/tools/clickbench-tools/conf/doris-cluster.conf
index cc7d8a2602e..7d70684941e 100644
--- a/tools/clickbench-tools/conf/doris-cluster.conf
+++ b/tools/clickbench-tools/conf/doris-cluster.conf
@@ -20,11 +20,11 @@ export FE_HOST='127.0.0.1'
# BE host
export BE_HOST='127.0.0.1'
# http_port in fe.conf
-export FE_HTTP_PORT=8030
+export FE_HTTP_PORT=8137
# webserver_port in be.conf
-export BE_WEBSERVER_PORT=8040
+export BE_WEBSERVER_PORT=8147
# query_port in fe.conf
-export FE_QUERY_PORT=9030
+export FE_QUERY_PORT=9137
# Doris username
export USER='root'
# Doris password
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]