This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 12f02ebf090 [opt](be) Batch row_id reads in seek_and_read_by_rowid to 
reduce column iterator overhead (#63436)
12f02ebf090 is described below

commit 12f02ebf090f707b3e745ba98563957b792f561a
Author: HappenLee <[email protected]>
AuthorDate: Thu May 21 12:00:36 2026 +0800

    [opt](be) Batch row_id reads in seek_and_read_by_rowid to reduce column 
iterator overhead (#63436)
    
    Change seek_and_read_by_rowid to accept a batch of row_ids instead of a
    single row_id, allowing the underlying column iterator's read_by_rowids
    to process all rows in one call. This eliminates per-row iterator
    re-initialization overhead in multi-row fetch paths (point query, batch
    index lookup).
    
    about 10% speed up
---
 be/src/exec/rowid_fetcher.cpp           | 136 +++++++++++++++++++++-----------
 be/src/service/point_query_executor.cpp |   5 +-
 be/src/storage/segment/segment.cpp      |  18 +++--
 be/src/storage/segment/segment.h        |   5 +-
 4 files changed, 111 insertions(+), 53 deletions(-)

diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index f97bce17a8c..27c66197541 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -357,6 +357,36 @@ struct SegItem {
     SegmentSharedPtr segment;
 };
 
+// Groups all row_ids belonging to the same segment for batched reading.
+// Position index tracks where each row_id originated in the original request,
+// so results can be scattered back to the correct output positions.
+struct DorisFormatReadBatch {
+    std::shared_ptr<FileMapping> file_mapping;
+    // (row_id, index_in_request) pairs for all rows in this segment.
+    std::vector<std::pair<segment_v2::rowid_t, size_t>> row_ids_with_positions;
+};
+
+static void scatter_scan_blocks_to_result_block(
+        const std::vector<std::pair<size_t, size_t>>& row_id_block_idx,
+        std::vector<Block>& scan_blocks, Block& result_block) {
+    for (size_t column_id = 0; column_id < result_block.columns(); 
++column_id) {
+        auto dst_col = 
const_cast<IColumn*>(result_block.get_by_position(column_id).column.get());
+
+        std::vector<const IColumn*> scan_src_columns;
+        scan_src_columns.reserve(row_id_block_idx.size());
+        std::vector<size_t> scan_positions;
+        scan_positions.reserve(row_id_block_idx.size());
+        for (const auto& [pos_block, block_idx] : row_id_block_idx) {
+            DCHECK(scan_blocks.size() > pos_block);
+            DCHECK(scan_blocks[pos_block].columns() > column_id);
+            scan_src_columns.emplace_back(
+                    
scan_blocks[pos_block].get_by_position(column_id).column.get());
+            scan_positions.emplace_back(block_idx);
+        }
+        dst_col->insert_from_multi_column(scan_src_columns, scan_positions);
+    }
+}
+
 Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
                                           PMultiGetResponse* response) {
     // read from storage engine row id by row id
@@ -460,7 +490,8 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequest& request,
                                   row_location.row_location.segment_id,
                                   row_location.row_location.row_id);
         for (int x = 0; x < slots.size(); ++x) {
-            auto row_id = 
static_cast<segment_v2::rowid_t>(row_loc.ordinal_id());
+            std::vector<segment_v2::rowid_t> row_ids {
+                    static_cast<segment_v2::rowid_t>(row_loc.ordinal_id())};
             MutableColumnPtr column = 
result_block.get_by_position(x).column->assume_mutable();
             IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
                                       .rowset_id = rowset_id,
@@ -475,8 +506,8 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequest& request,
             }
             segment = iterator_item.segment;
             RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
-                    full_read_schema, &slots[x], row_id, column, 
iterator_item.storage_read_options,
-                    iterator_item.iterator));
+                    full_read_schema, &slots[x], row_ids, column,
+                    iterator_item.storage_read_options, 
iterator_item.iterator));
         }
     }
     // serialize block if not empty
@@ -656,36 +687,72 @@ Status RowIdStorageReader::read_batch_doris_format_row(
         }
     }
 
-    std::vector<uint32_t> row_ids;
-    int k = 1;
-    auto max_k = 0;
-    for (int j = 0; j < request_block_desc.row_id_size();) {
+    // Phase 1: Group all row_ids by their (tablet_id, rowset_id, segment_id) 
key.
+    // Unlike the old code which only batched adjacent rows with the same 
file_id,
+    // this merges non-contiguous same-segment requests into a single batch,
+    // maximizing the number of rows read per seek_and_read_by_rowid call.
+    std::vector<DorisFormatReadBatch> scan_batches;
+    std::unordered_map<SegKey, size_t, HashOfSegKey> batch_idx_by_seg;
+    // (batch_idx, position_in_batch) for each row in the original request.
+    std::vector<std::pair<size_t, size_t>> 
row_id_block_idx(request_block_desc.row_id_size());
+    for (int j = 0; j < request_block_desc.row_id_size(); ++j) {
         auto file_id = request_block_desc.file_id(j);
-        row_ids.emplace_back(request_block_desc.row_id(j));
         auto file_mapping = id_file_map->get_file_mapping(file_id);
         if (!file_mapping) {
             return Status::InternalError(
                     "Backend:{} file_mapping not found, query_id: {}, file_id: 
{}",
                     BackendOptions::get_localhost(), print_id(query_id), 
file_id);
         }
-        for (k = 1; j + k < request_block_desc.row_id_size(); ++k) {
-            if (request_block_desc.file_id(j + k) == file_id) {
-                row_ids.emplace_back(request_block_desc.row_id(j + k));
-            } else {
-                break;
-            }
+
+        // Derive segment key and group by it — rows from the same segment are 
batched together
+        // even if they are interleaved with rows from other segments in the 
request.
+        auto [tablet_id, rowset_id, segment_id] = 
file_mapping->get_doris_format_info();
+        SegKey seg_key {.tablet_id = tablet_id, .rowset_id = rowset_id, 
.segment_id = segment_id};
+        auto [it, inserted] = batch_idx_by_seg.emplace(seg_key, 
scan_batches.size());
+        if (inserted) {
+            // First time seeing this segment, create a new batch for it.
+            scan_batches.emplace_back();
+            scan_batches.back().file_mapping = file_mapping;
         }
+        // Record (row_id, original_request_index) for later sorting and 
scattering.
+        
scan_batches[it->second].row_ids_with_positions.emplace_back(request_block_desc.row_id(j),
+                                                                     j);
+    }
 
-        RETURN_IF_ERROR(read_doris_format_row(
-                id_file_map, file_mapping, row_ids, slots, full_read_schema, 
row_store_read_struct,
-                stats, acquire_tablet_ms, acquire_rowsets_ms, 
acquire_segments_ms,
-                lookup_row_data_ms, seg_map, iterator_map, result_block));
+    // Phase 2: For each segment, sort row_ids ascending (required by 
ColumnIterator),
+    // deduplicate, then read all rows in a single batch call.
+    std::vector<Block> scan_blocks(scan_batches.size());
+    for (size_t batch_idx = 0; batch_idx < scan_batches.size(); ++batch_idx) {
+        auto& scan_batch = scan_batches[batch_idx];
+        auto& row_ids_with_positions = scan_batch.row_ids_with_positions;
+        std::sort(row_ids_with_positions.begin(), row_ids_with_positions.end(),
+                  [](const auto& lhs, const auto& rhs) { return lhs.first < 
rhs.first; });
+
+        // Column iterators read rowids monotonically. Deduplicate consecutive 
identical row_ids
+        // (different file_ids may map to the same row), then scatter rows 
back to their original
+        // request positions.
+        std::vector<uint32_t> row_ids;
+        row_ids.reserve(row_ids_with_positions.size());
+
+        // Also builds the scatter map: row_id_block_idx[original_request_idx] 
->
+        // (batch_idx, deduplicated_position_in_batch).
+        for (const auto& [row_id, result_idx] : row_ids_with_positions) {
+            if (row_ids.empty() || row_ids.back() != row_id) {
+                row_ids.emplace_back(row_id);
+            }
+            row_id_block_idx[result_idx] = std::make_pair(batch_idx, 
row_ids.size() - 1);
+        }
 
-        j += k;
-        max_k = std::max(max_k, k);
-        row_ids.clear();
+        scan_blocks[batch_idx] = Block(slots, row_ids.size());
+        RETURN_IF_ERROR(read_doris_format_row(id_file_map, 
scan_batch.file_mapping, row_ids, slots,
+                                              full_read_schema, 
row_store_read_struct, stats,
+                                              acquire_tablet_ms, 
acquire_rowsets_ms,
+                                              acquire_segments_ms, 
lookup_row_data_ms, seg_map,
+                                              iterator_map, 
scan_blocks[batch_idx]));
     }
 
+    scatter_scan_blocks_to_result_block(row_id_block_idx, scan_blocks, 
result_block);
+
     return Status::OK();
 }
 
@@ -929,24 +996,7 @@ Status RowIdStorageReader::read_batch_external_row(
             },
             &scan_running_time));
 
-    // Insert the read data into result_block.
-    for (size_t column_id = 0; column_id < result_block.get_columns().size(); 
column_id++) {
-        // The non-const Block(result_block) is passed in read_by_rowids, but 
columns[i] in get_columns
-        // is at bottom an immutable_ptr of Cow<IColumn>, so use const_cast
-        auto dst_col = 
const_cast<IColumn*>(result_block.get_columns()[column_id].get());
-
-        std::vector<const IColumn*> scan_src_columns;
-        scan_src_columns.reserve(row_id_block_idx.size());
-        std::vector<size_t> scan_positions;
-        scan_positions.reserve(row_id_block_idx.size());
-        for (const auto& [pos_block, block_idx] : row_id_block_idx) {
-            DCHECK(scan_blocks.size() > pos_block);
-            DCHECK(scan_blocks[pos_block].get_columns().size() > column_id);
-            
scan_src_columns.emplace_back(scan_blocks[pos_block].get_columns()[column_id].get());
-            scan_positions.emplace_back(block_idx);
-        }
-        dst_col->insert_from_multi_column(scan_src_columns, scan_positions);
-    }
+    scatter_scan_blocks_to_result_block(row_id_block_idx, scan_blocks, 
result_block);
 
     // Statistical runtime profile information.
     std::unique_ptr<RuntimeProfile> runtime_profile =
@@ -1101,11 +1151,9 @@ Status RowIdStorageReader::read_doris_format_row(
                 iterator_item.storage_read_options.stats = &stats;
                 iterator_item.storage_read_options.io_ctx.reader_type = 
ReaderType::READER_QUERY;
             }
-            for (auto row_id : row_ids) {
-                RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
-                        full_read_schema, &slots[x], row_id, column,
-                        iterator_item.storage_read_options, 
iterator_item.iterator));
-            }
+            RETURN_IF_ERROR(segment->seek_and_read_by_rowid(
+                    full_read_schema, &slots[x], row_ids, column,
+                    iterator_item.storage_read_options, 
iterator_item.iterator));
         }
     }
     return Status::OK();
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index 441284a251b..af34a3fe1d4 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -556,7 +556,8 @@ Status PointQueryExecutor::_lookup_row_data() {
             const auto& segment = *it;
             for (int cid : _reusable->missing_col_uids()) {
                 int pos = _reusable->get_col_uid_to_idx().at(cid);
-                auto row_id = static_cast<segment_v2::rowid_t>(row_loc.row_id);
+                std::vector<segment_v2::rowid_t> row_ids {
+                        static_cast<segment_v2::rowid_t>(row_loc.row_id)};
                 MutableColumnPtr column =
                         
_result_block->get_by_position(pos).column->assume_mutable();
                 std::unique_ptr<ColumnIterator> iter;
@@ -565,7 +566,7 @@ Status PointQueryExecutor::_lookup_row_data() {
                 storage_read_options.stats = &_read_stats;
                 storage_read_options.io_ctx.reader_type = 
ReaderType::READER_QUERY;
                 
RETURN_IF_ERROR(segment->seek_and_read_by_rowid(*_tablet->tablet_schema(), slot,
-                                                                row_id, column,
+                                                                row_ids, 
column,
                                                                 
storage_read_options, iter));
                 if (_tablet->tablet_schema()
                             ->column_by_uid(slot->col_unique_id())
diff --git a/be/src/storage/segment/segment.cpp 
b/be/src/storage/segment/segment.cpp
index 1c5578c1388..5c3c3f74202 100644
--- a/be/src/storage/segment/segment.cpp
+++ b/be/src/storage/segment/segment.cpp
@@ -23,6 +23,7 @@
 #include <gen_cpp/olap_file.pb.h>
 #include <gen_cpp/segment_v2.pb.h>
 
+#include <algorithm>
 #include <cstring>
 #include <memory>
 #include <sstream>
@@ -944,9 +945,17 @@ Status Segment::read_key_by_rowid(uint32_t row_id, 
std::string* key) {
 }
 
 Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, 
SlotDescriptor* slot,
-                                       uint32_t row_id, MutableColumnPtr& 
result,
+                                       const std::vector<uint32_t>& row_ids,
+                                       MutableColumnPtr& result,
                                        StorageReadOptions& 
storage_read_options,
                                        std::unique_ptr<ColumnIterator>& 
iterator_hint) {
+    if (row_ids.empty()) {
+        return Status::OK();
+    }
+    DORIS_CHECK(std::is_sorted(row_ids.begin(), row_ids.end()));
+    DORIS_CHECK(std::adjacent_find(row_ids.begin(), row_ids.end()) == 
row_ids.end());
+    // ColumnIterator::seek_and_read expects monotonically increasing row_ids 
without
+    // duplicates for correct ordinal scanning. Enforce this contract at the 
entry point.
     segment_v2::ColumnIteratorOptions opt {
             .use_page_cache = !config::disable_storage_page_cache,
             .file_reader = file_reader().get(),
@@ -956,7 +965,6 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& 
schema, SlotDescripto
                                              
&storage_read_options.stats->file_cache_stats},
     };
 
-    std::vector<segment_v2::rowid_t> single_row_loc {row_id};
     if (!slot->column_paths().empty()) {
         // here need create column readers to make sure column reader is 
created before seek_and_read_by_rowid
         // if segment cache miss, column reader will be created to make sure 
the variant column result not coredump
@@ -977,13 +985,13 @@ Status Segment::seek_and_read_by_rowid(const 
TabletSchema& schema, SlotDescripto
             RETURN_IF_ERROR(iterator_hint->init(opt));
         }
         RETURN_IF_ERROR(
-                iterator_hint->read_by_rowids(single_row_loc.data(), 1, 
file_storage_column));
+                iterator_hint->read_by_rowids(row_ids.data(), row_ids.size(), 
file_storage_column));
         ColumnPtr source_ptr;
         // storage may have different type with schema, so we need to cast the 
column
         RETURN_IF_ERROR(variant_util::cast_column(
                 ColumnWithTypeAndName(file_storage_column->get_ptr(), 
storage_type, column.name()),
                 slot->type(), &source_ptr));
-        RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0, 
1));
+        RETURN_IF_CATCH_EXCEPTION(result->insert_range_from(*source_ptr, 0, 
row_ids.size()));
     } else {
         int index = (slot->col_unique_id() >= 0) ? 
schema.field_index(slot->col_unique_id())
                                                  : 
schema.field_index(slot->col_name());
@@ -998,7 +1006,7 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& 
schema, SlotDescripto
                                                 &storage_read_options));
             RETURN_IF_ERROR(iterator_hint->init(opt));
         }
-        RETURN_IF_ERROR(iterator_hint->read_by_rowids(single_row_loc.data(), 
1, result));
+        RETURN_IF_ERROR(iterator_hint->read_by_rowids(row_ids.data(), 
row_ids.size(), result));
     }
     return Status::OK();
 }
diff --git a/be/src/storage/segment/segment.h b/be/src/storage/segment/segment.h
index f18be6c093a..465d20343bf 100644
--- a/be/src/storage/segment/segment.h
+++ b/be/src/storage/segment/segment.h
@@ -144,8 +144,9 @@ public:
 
     Status read_key_by_rowid(uint32_t row_id, std::string* key);
 
-    Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* 
slot, uint32_t row_id,
-                                  MutableColumnPtr& result,
+    // row_ids must be strictly increasing.
+    Status seek_and_read_by_rowid(const TabletSchema& schema, SlotDescriptor* 
slot,
+                                  const std::vector<uint32_t>& row_ids, 
MutableColumnPtr& result,
                                   StorageReadOptions& storage_read_options,
                                   std::unique_ptr<ColumnIterator>& 
iterator_hint);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to