This is an automated email from the ASF dual-hosted git repository.
BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2f83e5d3df6 [improvement](query) Align olap scan schema with storage
keys (#64413)
2f83e5d3df6 is described below
commit 2f83e5d3df65d2eaae3265034053df3f363f554c
Author: Pxl <[email protected]>
AuthorDate: Wed Jul 1 19:13:33 2026 +0800
[improvement](query) Align olap scan schema with storage keys (#64413)
This is an alternative to keeping separate storage and execution schemas
in the BE segment iterator for storage expression pushdown.
For AGG key tables and non-MOW unique key tables, the planner now
expands the OLAP scan tuple so the scan schema starts with the storage
key columns in storage order. Missing key slots are marked in thrift as
FE-filled key slots. The scan node projects back to the original
execution output after storage filters and runtime filters are
translated.
On the BE side, the marked key slots are mapped to tablet column ids.
Direct reader paths can fill those storage-only key columns without
reading their data pages, while merge and aggregation reader paths still
keep real key columns for storage semantics. Since FE now provides the
storage-aligned key prefix, the non-direct scanner no longer rebuilds
the return column list as "all keys plus value outputs"; it only checks
the key prefix and keeps the existing sequence-column expansion.
---
be/src/exec/operator/olap_scan_operator.cpp | 10 -
be/src/exec/operator/olap_scan_operator.h | 3 -
be/src/exec/scan/olap_scanner.cpp | 38 +-
be/src/exec/scan/olap_scanner.h | 10 +-
be/src/exec/scan/scanner_scheduler.cpp | 11 +-
be/src/storage/iterator/vcollect_iterator.cpp | 24 +-
be/src/storage/iterators.h | 8 +-
be/src/storage/rowset/beta_rowset_reader.cpp | 3 +-
be/src/storage/rowset/rowset_reader_context.h | 7 +-
be/src/storage/schema.cpp | 6 +
be/src/storage/schema.h | 4 +
be/src/storage/segment/segment_iterator.cpp | 394 +++++++--------------
be/src/storage/segment/segment_iterator.h | 26 +-
be/src/storage/tablet/tablet_reader.cpp | 7 +-
be/src/storage/tablet/tablet_reader.h | 8 +-
.../glue/translator/PhysicalPlanTranslator.java | 33 ++
.../trees/plans/physical/PhysicalOlapScan.java | 5 +-
.../org/apache/doris/planner/OlapScanNode.java | 25 ++
.../translator/PhysicalPlanTranslatorTest.java | 52 ++-
gensrc/thrift/PlanNodes.thrift | 2 +
20 files changed, 301 insertions(+), 375 deletions(-)
diff --git a/be/src/exec/operator/olap_scan_operator.cpp
b/be/src/exec/operator/olap_scan_operator.cpp
index 8c103a7072c..5fda916c405 100644
--- a/be/src/exec/operator/olap_scan_operator.cpp
+++ b/be/src/exec/operator/olap_scan_operator.cpp
@@ -1006,16 +1006,6 @@ Status OlapScanLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(virtual_column_expr_ctx->open(state));
_slot_id_to_virtual_column_expr[slot_desc->id()] =
virtual_column_expr_ctx;
- _slot_id_to_col_type[slot_desc->id()] =
slot_desc->get_data_type_ptr();
- int col_pos =
p.intermediate_row_desc().get_column_id(slot_desc->id());
- if (col_pos < 0) {
- return Status::InternalError(
- "Invalid virtual slot, can not find its information.
Slot desc:\n{}\nRow "
- "desc:\n{}",
- slot_desc->debug_string(),
p.row_desc().debug_string());
- } else {
- _slot_id_to_index_in_block[slot_desc->id()] = col_pos;
- }
}
}
diff --git a/be/src/exec/operator/olap_scan_operator.h
b/be/src/exec/operator/olap_scan_operator.h
index c30aad5cf29..309c99191bd 100644
--- a/be/src/exec/operator/olap_scan_operator.h
+++ b/be/src/exec/operator/olap_scan_operator.h
@@ -339,9 +339,6 @@ private:
std::vector<TabletReadSource> _read_sources;
std::map<SlotId, VExprContextSPtr> _slot_id_to_virtual_column_expr;
- std::map<SlotId, size_t> _slot_id_to_index_in_block;
- // this map is needed for scanner opening.
- std::map<SlotId, DataTypePtr> _slot_id_to_col_type;
// ---- Runtime-filter partition pruning ----
// Attaches this per-instance pruner to the shared parse result owned by
diff --git a/be/src/exec/scan/olap_scanner.cpp
b/be/src/exec/scan/olap_scanner.cpp
index 9f6fc8ef2b6..b5ec0443045 100644
--- a/be/src/exec/scan/olap_scanner.cpp
+++ b/be/src/exec/scan/olap_scanner.cpp
@@ -92,12 +92,11 @@ OlapScanner::OlapScanner(ScanLocalStateBase* parent,
OlapScanner::Params&& param
.rs_splits {},
.return_columns {},
.output_columns {},
+ .extra_columns {},
.common_expr_ctxs_push_down {},
.topn_filter_source_node_ids {},
.key_group_cluster_key_idxes {},
.virtual_column_exprs {},
- .vir_cid_to_idx_in_block {},
- .vir_col_idx_to_type {},
.score_runtime {},
.collection_statistics {},
.ann_topn_runtime {},
@@ -179,8 +178,6 @@ Status OlapScanner::_prepare_impl() {
_slot_id_to_virtual_column_expr[pair.first] = context;
}
- _slot_id_to_index_in_block = local_state->_slot_id_to_index_in_block;
- _slot_id_to_col_type = local_state->_slot_id_to_col_type;
_score_runtime = local_state->_score_runtime;
// All scanners share the same ann_topn_runtime.
_ann_topn_runtime = local_state->_ann_topn_runtime;
@@ -399,8 +396,6 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.common_expr_ctxs_push_down =
_common_expr_ctxs_push_down;
_tablet_reader_params.virtual_column_exprs = _virtual_column_exprs;
- _tablet_reader_params.vir_cid_to_idx_in_block = _vir_cid_to_idx_in_block;
- _tablet_reader_params.vir_col_idx_to_type = _vir_col_idx_to_type;
_tablet_reader_params.score_runtime = _score_runtime;
_tablet_reader_params.output_columns =
((OlapScanLocalState*)_local_state)->_output_column_ids;
_tablet_reader_params.ann_topn_runtime = _ann_topn_runtime;
@@ -696,6 +691,11 @@ Status OlapScanner::_init_variant_columns() {
}
Status OlapScanner::_init_return_columns() {
+ // For OLAP scan, _output_tuple_desc is the storage-aligned scan tuple
+ // descriptor. extra_key_column_slot_ids marks extra key slots that are
+ // present only for scan-schema alignment. For example, on an AGG table
with
+ // keys (k1, k2), a query returning only k2 may still scan (k1, k2); k1 is
+ // an extra column and can be removed by the projection output tuple.
for (auto* slot : _output_tuple_desc->slots()) {
// variant column using path to index a column
int32_t index = 0;
@@ -716,19 +716,25 @@ Status OlapScanner::_init_return_columns() {
}
if (slot->get_virtual_column_expr()) {
- ColumnId virtual_column_cid = index;
- _virtual_column_exprs[virtual_column_cid] =
_slot_id_to_virtual_column_expr[slot->id()];
- size_t idx_in_block = _slot_id_to_index_in_block[slot->id()];
- _vir_cid_to_idx_in_block[virtual_column_cid] = idx_in_block;
- _vir_col_idx_to_type[idx_in_block] =
_slot_id_to_col_type[slot->id()];
-
- VLOG_DEBUG << fmt::format(
- "Virtual column, slot id: {}, cid {}, column index: {},
type: {}", slot->id(),
- virtual_column_cid,
_vir_cid_to_idx_in_block[virtual_column_cid],
- _vir_col_idx_to_type[idx_in_block]->get_name());
+ _virtual_column_exprs[index] =
_slot_id_to_virtual_column_expr[slot->id()];
+
+ VLOG_DEBUG << fmt::format("Virtual column, slot id: {}, cid {},
type: {}", slot->id(),
+ index,
slot->get_data_type_ptr()->get_name());
}
const auto& column = tablet_schema->column(index);
+ auto* olap_local_state =
static_cast<OlapScanLocalState*>(_local_state);
+ const auto& olap_scan_node = olap_local_state->olap_scan_node();
+ if (olap_scan_node.__isset.extra_key_column_slot_ids &&
+ olap_scan_node.extra_key_column_slot_ids.contains(slot->id())) {
+ DORIS_CHECK(column.is_key());
+ if (_tablet_reader_params.direct_mode) {
+ // Direct readers can synthesize extra storage keys because
they are only
+ // placeholders before the scan projection removes them.
Merge/aggregation
+ // readers must still read real key values to preserve storage
semantics.
+ _tablet_reader_params.extra_columns.insert(index);
+ }
+ }
int32_t unique_id =
column.unique_id() >= 0 ? column.unique_id() :
column.parent_unique_id();
if (!slot->all_access_paths().empty()) {
diff --git a/be/src/exec/scan/olap_scanner.h b/be/src/exec/scan/olap_scanner.h
index 4a09b6e3948..dc308d34713 100644
--- a/be/src/exec/scan/olap_scanner.h
+++ b/be/src/exec/scan/olap_scanner.h
@@ -18,9 +18,9 @@
#pragma once
#include <gen_cpp/PaloInternalService_types.h>
-#include <stdint.h>
#include <cstddef>
+#include <cstdint>
#include <map>
#include <memory>
#include <optional>
@@ -121,17 +121,11 @@ public:
std::unordered_set<uint32_t> _tablet_columns_convert_to_null_set;
- // This three fields are copied from OlapScanLocalState.
+ // This field is copied from OlapScanLocalState.
std::map<SlotId, VExprContextSPtr> _slot_id_to_virtual_column_expr;
- std::map<SlotId, size_t> _slot_id_to_index_in_block;
- std::map<SlotId, DataTypePtr> _slot_id_to_col_type;
// ColumnId of virtual column to its expr context
std::map<ColumnId, VExprContextSPtr> _virtual_column_exprs;
- // ColumnId of virtual column to its index in block
- std::map<ColumnId, size_t> _vir_cid_to_idx_in_block;
- // The idx of vir_col in block to its data type.
- std::map<size_t, DataTypePtr> _vir_col_idx_to_type;
std::shared_ptr<ScoreRuntime> _score_runtime;
std::shared_ptr<segment_v2::AnnTopNRuntime> _ann_topn_runtime;
diff --git a/be/src/exec/scan/scanner_scheduler.cpp
b/be/src/exec/scan/scanner_scheduler.cpp
index 7e1deb2bae7..230a54048a2 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -362,18 +362,17 @@ void
ScannerScheduler::_make_sure_virtual_col_is_materialized(
continue;
}
- std::vector<std::string> vcid_to_idx;
-
- for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) {
- vcid_to_idx.push_back(fmt::format("{}-{}", pair.first,
pair.second));
+ std::vector<ColumnId> virtual_column_ids;
+ for (const auto& pair : olap_scanner->_virtual_column_exprs) {
+ virtual_column_ids.push_back(pair.first);
}
std::string error_msg = fmt::format(
"Column in idx {} is nothing, block columns {}, normal_columns
"
"{}, "
- "vir_cid_to_idx_in_block_msg {}",
+ "virtual_column_ids [{}]",
idx, free_block->columns(),
olap_scanner->_return_columns.size(),
- fmt::format("_vir_cid_to_idx_in_block:[{}]",
fmt::join(vcid_to_idx, ",")));
+ fmt::join(virtual_column_ids, ","));
throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg);
}
#endif
diff --git a/be/src/storage/iterator/vcollect_iterator.cpp
b/be/src/storage/iterator/vcollect_iterator.cpp
index e6386ea9029..e4fa85042d3 100644
--- a/be/src/storage/iterator/vcollect_iterator.cpp
+++ b/be/src/storage/iterator/vcollect_iterator.cpp
@@ -287,23 +287,23 @@ Status VCollectIterator::_topn_next(Block* block) {
return Status::Error<END_OF_FILE>("");
}
+ // `block` is reused below as the per-rowset read buffer. Keep TopN
candidates in a
+ // separate mutable block with the same schema so stored row positions
remain stable.
auto clone_block = block->clone_empty();
- // Initialize virtual slot columns by schema (avoid runtime type checks):
- // use _reader_context.vir_col_idx_to_type to construct real columns for
those positions.
- if (!_reader->_reader_context.vir_col_idx_to_type.empty()) {
- const auto& idx_to_type = _reader->_reader_context.vir_col_idx_to_type;
- for (const auto& kv : idx_to_type) {
- size_t idx = kv.first;
- if (idx < clone_block.columns()) {
- clone_block.get_by_position(idx).column =
kv.second->create_column();
- }
- }
- }
+ // Initialize virtual slot columns by schema (avoid runtime type checks).
+ for (const auto& [cid, expr_ctx] :
_reader->_reader_context.virtual_column_exprs) {
+ auto it = std::find(_reader->_return_columns.begin(),
_reader->_return_columns.end(), cid);
+ DORIS_CHECK(it != _reader->_return_columns.end());
+ auto idx =
cast_set<size_t>(std::distance(_reader->_return_columns.begin(), it));
+ DORIS_CHECK(idx < clone_block.columns());
+ clone_block.get_by_position(idx).column =
expr_ctx->root()->data_type()->create_column();
+ }
+ const size_t clone_block_columns = clone_block.columns();
MutableBlock mutable_block =
MutableBlock::build_mutable_block(std::move(clone_block));
const std::vector<uint32_t>* sort_columns =
_reader->_reader_context.read_orderby_key_columns;
for (auto column_idx : *sort_columns) {
- DORIS_CHECK(column_idx < clone_block.columns());
+ DORIS_CHECK(column_idx < clone_block_columns);
}
size_t first_sort_column_idx = (*sort_columns)[0];
diff --git a/be/src/storage/iterators.h b/be/src/storage/iterators.h
index 59071909da7..3b6fa114be3 100644
--- a/be/src/storage/iterators.h
+++ b/be/src/storage/iterators.h
@@ -19,6 +19,7 @@
#include <cstddef>
#include <memory>
+#include <set>
#include "common/status.h"
#include "core/block/block.h"
@@ -130,6 +131,11 @@ public:
io::IOContext io_ctx;
VExprContextSPtrs common_expr_ctxs_push_down;
const std::set<int32_t>* output_columns = nullptr;
+ // Extra storage key columns that are included only to keep the scan schema
+ // aligned with the storage key prefix. SegmentIterator can synthesize
+ // placeholders only after proving predicates, delete conditions, and
+ // expressions do not need their real values.
+ std::set<ColumnId> extra_columns;
// runtime state
RuntimeState* runtime_state = nullptr;
RowsetId rowset_id;
@@ -148,8 +154,6 @@ public:
std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
- std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
- std::map<size_t, DataTypePtr> vir_col_idx_to_type;
std::map<int32_t, TColumnAccessPaths> all_access_paths;
std::map<int32_t, TColumnAccessPaths> predicate_access_paths;
diff --git a/be/src/storage/rowset/beta_rowset_reader.cpp
b/be/src/storage/rowset/beta_rowset_reader.cpp
index 56b580e152d..6798c1c6934 100644
--- a/be/src/storage/rowset/beta_rowset_reader.cpp
+++ b/be/src/storage/rowset/beta_rowset_reader.cpp
@@ -107,8 +107,6 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.predicate_access_paths =
_read_context->predicate_access_paths;
_read_options.ann_topn_runtime = _read_context->ann_topn_runtime;
- _read_options.vir_cid_to_idx_in_block =
_read_context->vir_cid_to_idx_in_block;
- _read_options.vir_col_idx_to_type = _read_context->vir_col_idx_to_type;
_read_options.score_runtime = _read_context->score_runtime;
_read_options.collection_statistics = _read_context->collection_statistics;
_read_options.rowset_id = _rowset->rowset_id();
@@ -152,6 +150,7 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
// create segment iterators
VLOG_NOTICE << "read columns size: " << read_columns.size();
_input_schema =
std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
+ _read_options.extra_columns = _read_context->extra_columns;
// output_schema only contains return_columns (excludes extra columns like
delete-predicate columns).
// It is used by merge/union iterators to determine how many columns to
copy to the output block.
_output_schema =
std::make_shared<Schema>(_read_context->tablet_schema->columns(),
diff --git a/be/src/storage/rowset/rowset_reader_context.h
b/be/src/storage/rowset/rowset_reader_context.h
index cb61a6b0ad9..13090805ff4 100644
--- a/be/src/storage/rowset/rowset_reader_context.h
+++ b/be/src/storage/rowset/rowset_reader_context.h
@@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H
#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H
+#include <set>
#include <vector>
#include "exprs/score_runtime.h"
@@ -77,9 +78,6 @@ struct RowsetReaderContext {
// Effective adaptive batch size byte budget. 0 means disabled internally.
size_t preferred_block_size_bytes = 8388608UL;
- // Points to the "true" output column list before non-direct-mode
expansion.
- // Used by BlockReader to map expanded storage columns back to the
requested output layout.
- const std::vector<ColumnId>* origin_return_columns = nullptr;
bool is_unique = false;
//record row num merged in generic iterator
uint64_t* merged_rows = nullptr;
@@ -90,14 +88,13 @@ struct RowsetReaderContext {
RowIdConversion* rowid_conversion = nullptr;
bool is_key_column_group = false;
const std::set<int32_t>* output_columns = nullptr;
+ std::set<ColumnId> extra_columns;
RowsetId rowset_id;
// slots that cast may be eliminated in storage layer
std::map<std::string, DataTypePtr> target_cast_type_for_variants;
int64_t ttl_seconds = 0;
std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
- std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
- std::map<size_t, DataTypePtr> vir_col_idx_to_type;
std::map<int32_t, TColumnAccessPaths> all_access_paths;
std::map<int32_t, TColumnAccessPaths> predicate_access_paths;
diff --git a/be/src/storage/schema.cpp b/be/src/storage/schema.cpp
index 27dd40e9d74..7ddd766a8ac 100644
--- a/be/src/storage/schema.cpp
+++ b/be/src/storage/schema.cpp
@@ -55,6 +55,7 @@ Schema& Schema::operator=(const Schema& other) {
void Schema::_copy_from(const Schema& other) {
_col_ids = other._col_ids;
+ _column_id_to_index = other._column_id_to_index;
_num_key_columns = other._num_key_columns;
_delete_sign_idx = other._delete_sign_idx;
_has_sequence_col = other._has_sequence_col;
@@ -77,6 +78,11 @@ void Schema::_init(const std::vector<TabletColumnPtr>& cols,
const std::vector<C
_cols.resize(cols.size());
std::unordered_set<uint32_t> col_id_set(col_ids.begin(), col_ids.end());
+ _column_id_to_index.assign(cols.size(), -1);
+ for (size_t i = 0; i < col_ids.size(); ++i) {
+ _column_id_to_index[col_ids[i]] = static_cast<int>(i);
+ }
+
for (int cid = 0; cid < cols.size(); ++cid) {
if (col_id_set.find(cid) == col_id_set.end()) {
continue;
diff --git a/be/src/storage/schema.h b/be/src/storage/schema.h
index dcd27fe9c0b..5fd5a4b01f2 100644
--- a/be/src/storage/schema.h
+++ b/be/src/storage/schema.h
@@ -96,6 +96,8 @@ public:
size_t num_column_ids() const { return _col_ids.size(); }
const std::vector<ColumnId>& column_ids() const { return _col_ids; }
ColumnId column_id(size_t index) const { return _col_ids[index]; }
+ int column_index(ColumnId cid) const { return _column_id_to_index[cid]; }
+ const std::vector<int>& column_id_to_index() const { return
_column_id_to_index; }
int32_t delete_sign_idx() const { return _delete_sign_idx; }
bool has_sequence_col() const { return _has_sequence_col; }
int32_t rowid_col_idx() const { return _rowid_col_idx; }
@@ -119,6 +121,8 @@ private:
// NOTE: _cols[cid] can only be accessed when the cid is
// contained in _col_ids
std::vector<TabletColumnPtr> _cols;
+ // Tablet column id -> slot index in this Schema.
+ std::vector<int> _column_id_to_index;
size_t _num_key_columns;
int32_t _delete_sign_idx = -1;
diff --git a/be/src/storage/segment/segment_iterator.cpp
b/be/src/storage/segment/segment_iterator.cpp
index d0b5a0f3c05..9d3e5d98e47 100644
--- a/be/src/storage/segment/segment_iterator.cpp
+++ b/be/src/storage/segment/segment_iterator.cpp
@@ -17,7 +17,6 @@
#include "storage/segment/segment_iterator.h"
-#include <assert.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Opcodes_types.h>
#include <gen_cpp/Types_types.h>
@@ -25,6 +24,7 @@
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
+#include <cassert>
#include <cstdint>
#include <memory>
#include <numeric>
@@ -71,7 +71,6 @@
#include "io/cache/cached_remote_file_reader.h"
#include "io/fs/file_reader.h"
#include "io/io_common.h"
-#include "runtime/descriptors.h"
#include "runtime/query_context.h"
#include "runtime/runtime_predicate.h"
#include "runtime/runtime_state.h"
@@ -113,127 +112,11 @@
#include "storage/utils.h"
#include "util/concurrency_stats.h"
#include "util/defer_op.h"
-#include "util/json/path_in_data.h"
#include "util/simd/bits.h"
namespace doris {
using namespace ErrorCode;
namespace segment_v2 {
-namespace {
-
-Status tablet_column_id_by_slot(const TabletSchemaSPtr& tablet_schema, const
SlotDescriptor* slot,
- ColumnId* cid) {
- int32_t field_index = -1;
- if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
- field_index = tablet_schema->field_index(
-
PathInData(tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
- slot->column_paths()));
- } else {
- field_index = slot->col_unique_id() >= 0 ?
tablet_schema->field_index(slot->col_unique_id())
- :
tablet_schema->field_index(slot->col_name());
- }
- if (field_index < 0) {
- return Status::InternalError(
- "field name is invalid. field={}, field_name_to_index={},
col_unique_id={}",
- slot->col_name(), tablet_schema->get_all_field_names(),
slot->col_unique_id());
- }
- *cid = field_index;
- return Status::OK();
-}
-
-Status rebind_storage_expr_to_reader_schema(
- const StorageReadOptions& opts, const VExprSPtr& expr,
- const std::unordered_map<ColumnId, size_t>& cid_to_pos) {
- DORIS_CHECK(expr != nullptr);
-
- if (expr->is_slot_ref()) {
- auto slot_ref = std::static_pointer_cast<VSlotRef>(expr);
- auto* slot =
opts.runtime_state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
- if (slot == nullptr) {
- return Status::InternalError("slot {} is not found in descriptor
table",
- slot_ref->slot_id());
- }
-
- ColumnId cid = 0;
- RETURN_IF_ERROR(tablet_column_id_by_slot(opts.tablet_schema, slot,
&cid));
- auto pos_it = cid_to_pos.find(cid);
- if (pos_it == cid_to_pos.end()) {
- return Status::InternalError("slot {} column {} with cid {} is not
in reader schema",
- slot_ref->slot_id(),
slot->col_name(), cid);
- }
- slot_ref->set_column_id(cast_set<int>(pos_it->second));
- } else if (expr->is_virtual_slot_ref()) {
- auto virtual_slot_ref = std::static_pointer_cast<VirtualSlotRef>(expr);
- auto* slot =
-
opts.runtime_state->desc_tbl().get_slot_descriptor(virtual_slot_ref->slot_id());
- if (slot == nullptr) {
- return Status::InternalError("slot {} is not found in descriptor
table",
- virtual_slot_ref->slot_id());
- }
-
- ColumnId cid = 0;
- RETURN_IF_ERROR(tablet_column_id_by_slot(opts.tablet_schema, slot,
&cid));
- auto pos_it = cid_to_pos.find(cid);
- if (pos_it == cid_to_pos.end()) {
- return Status::InternalError(
- "virtual slot {} column {} with cid {} is not in reader
schema",
- virtual_slot_ref->slot_id(), slot->col_name(), cid);
- }
- virtual_slot_ref->set_column_id(cast_set<int>(pos_it->second));
- // A virtual slot has its own output position in the reader block, and
its
- // materialization expression may also contain real slot refs. Rebind
both
- // sides so evaluating the virtual expression reads from the same block
- // layout used by SegmentIterator.
- RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(
- opts, virtual_slot_ref->get_virtual_column_expr(),
cid_to_pos));
- }
-
- for (const auto& child : expr->children()) {
- RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, child,
cid_to_pos));
- }
- return Status::OK();
-}
-
-} // namespace
-
-Status rebind_storage_exprs_to_reader_schema(const StorageReadOptions& opts,
const Schema& schema,
- const VExprContextSPtrs&
common_exprs,
- std::map<ColumnId,
VExprContextSPtr>& virtual_exprs) {
- if (common_exprs.empty() && virtual_exprs.empty()) {
- return Status::OK();
- }
- DORIS_CHECK(opts.runtime_state != nullptr);
- DORIS_CHECK(opts.tablet_schema != nullptr);
-
- const auto keys_type = opts.tablet_schema->keys_type();
- if (keys_type == KeysType::DUP_KEYS ||
- (keys_type == KeysType::UNIQUE_KEYS &&
opts.enable_unique_key_merge_on_write)) {
- return Status::OK();
- }
-
- // Storage exprs are prepared with RowDescriptor, so
VSlotRef/VirtualSlotRef column_id points to
- // the scan tuple column ordinal. SegmentIterator evaluates cloned exprs
on a block built from
- // the reader schema instead. AGG_KEYS and non-MOW UNIQUE_KEYS readers may
expand the reader
- // schema, for example by filling all key columns before
merging/aggregating rows, so the scan
- // tuple ordinal is not always the same as the runtime block ordinal.
- //
- // DUP_KEYS and UNIQUE_KEYS MOW use direct readers for query scans, so
their reader block keeps
- // the scan tuple layout and can skip this per-segment expression-tree
traversal. For merge/agg
- // readers, the reader schema is the source of truth: map tablet column id
to reader-block
- // position and rebind every storage expr slot to that position.
- std::unordered_map<ColumnId, size_t> cid_to_pos;
- for (size_t pos = 0; pos < schema.num_column_ids(); ++pos) {
- cid_to_pos.emplace(schema.column_id(cast_set<int>(pos)), pos);
- }
-
- for (const auto& ctx : common_exprs) {
- RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts,
ctx->root(), cid_to_pos));
- }
- for (const auto& [_, ctx] : virtual_exprs) {
- RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts,
ctx->root(), cid_to_pos));
- }
- return Status::OK();
-}
SegmentIterator::~SegmentIterator() = default;
@@ -530,7 +413,6 @@ Status SegmentIterator::_init_impl(const
StorageReadOptions& opts) {
}
_virtual_column_exprs = _opts.virtual_column_exprs;
- _vir_cid_to_idx_in_block = _opts.vir_cid_to_idx_in_block;
_score_runtime = _opts.score_runtime;
_ann_topn_runtime = _opts.ann_topn_runtime;
@@ -582,10 +464,8 @@ Status SegmentIterator::_init_impl(const
StorageReadOptions& opts) {
RETURN_IF_ERROR(_construct_compound_expr_context());
VLOG_DEBUG << fmt::format(
- "Segment iterator init, virtual_column_exprs size: {}, "
- "_vir_cid_to_idx_in_block size: {}, common_expr_pushdown size: {}",
- _opts.virtual_column_exprs.size(),
_opts.vir_cid_to_idx_in_block.size(),
- _common_expr_ctxs_push_down.size());
+ "Segment iterator init, virtual_column_exprs size: {},
common_expr_pushdown size: {}",
+ _opts.virtual_column_exprs.size(),
_common_expr_ctxs_push_down.size());
_initialize_predicate_results();
return Status::OK();
}
@@ -1002,6 +882,7 @@ Status SegmentIterator::_apply_ann_topn_predicate() {
VLOG_DEBUG << fmt::format("Try apply ann topn: {}",
_ann_topn_runtime->debug_string());
size_t src_col_idx = _ann_topn_runtime->get_src_column_idx();
+ // AnnTopNRuntime keeps VSlotRef::column_id(), which is the scan schema
ordinal.
ColumnId src_cid = _schema->column_id(src_col_idx);
IndexIterator* ann_index_iterator = _index_iterators[src_cid].get();
bool has_ann_index = _column_has_ann_index(src_cid);
@@ -1290,8 +1171,9 @@ Status
SegmentIterator::_extract_common_expr_columns(const VExprSPtr& expr) {
auto node_type = expr->node_type();
if (node_type == TExprNodeType::SLOT_REF) {
auto slot_expr = std::dynamic_pointer_cast<doris::VSlotRef>(expr);
- _is_common_expr_column[_schema->column_id(slot_expr->column_id())] =
true;
-
_common_expr_columns.insert(_schema->column_id(slot_expr->column_id()));
+ auto cid = _schema->column_id(slot_expr->column_id());
+ _is_common_expr_column[cid] = true;
+ _common_expr_columns.insert(cid);
} else if (node_type == TExprNodeType::VIRTUAL_SLOT_REF) {
std::shared_ptr<VirtualSlotRef> virtual_slot_ref =
std::dynamic_pointer_cast<VirtualSlotRef>(expr);
@@ -1512,6 +1394,9 @@ bool SegmentIterator::_need_read_data(ColumnId cid) {
if (_opts.runtime_state &&
!_opts.runtime_state->query_options().enable_no_need_read_data_opt) {
return true;
}
+ if (_can_skip_reading_extra_column(cid)) {
+ return false;
+ }
// only support DUP_KEYS and UNIQUE_KEYS with MOW
if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
(_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS &&
@@ -1519,7 +1404,7 @@ bool SegmentIterator::_need_read_data(ColumnId cid) {
return true;
}
// this is a virtual column, we always need to read data
- if (this->_vir_cid_to_idx_in_block.contains(cid)) {
+ if (_virtual_column_exprs.contains(cid)) {
return true;
}
@@ -1684,8 +1569,8 @@ Status SegmentIterator::_init_return_column_iterators() {
}
#ifndef NDEBUG
- for (auto pair : _vir_cid_to_idx_in_block) {
- ColumnId vir_col_cid = pair.first;
+ for (const auto& entry : _virtual_column_exprs) {
+ ColumnId vir_col_cid = entry.first;
DCHECK(_column_iterators[vir_col_cid] != nullptr)
<< "Virtual column iterator for " << vir_col_cid << " should
not be null";
ColumnIterator* column_iter = _column_iterators[vir_col_cid].get();
@@ -2083,19 +1968,6 @@ Status SegmentIterator::_vec_init_lazy_materialization()
{
_is_need_short_eval = true;
}
- // ColumnId to column index in block
- // ColumnId will contail all columns in tablet schema, including virtual
columns and global rowid column,
- _schema_block_id_map.resize(_schema->columns().size(), -1);
- // Use cols read by query to initialize _schema_block_id_map.
- // We need to know the index of each column in the block.
- // There is an assumption here that the columns in the block are in the
same order as in the read schema.
- // TODO: A probelm is that, delete condition columns will exist in
_schema->column_ids but not in block if
- // delete column is not read by the query.
- for (int i = 0; i < _schema->num_column_ids(); i++) {
- auto cid = _schema->column_id(i);
- _schema_block_id_map[cid] = i;
- }
-
// Step2: extract columns that can execute expr context
_is_common_expr_column.resize(_schema->columns().size(), false);
if (!_common_expr_ctxs_push_down.empty()) {
@@ -2109,13 +1981,13 @@ Status
SegmentIterator::_vec_init_lazy_materialization() {
// if delete condition column not in the block, no filter is
needed
// and will be removed from _columns_to_filter in the first
next_batch.
if (_is_common_expr_column[cid] || _is_pred_column[cid]) {
- auto loc = _schema_block_id_map[cid];
+ auto loc = _schema->column_index(cid);
_columns_to_filter.push_back(loc);
}
}
- for (auto pair : _vir_cid_to_idx_in_block) {
- _columns_to_filter.push_back(cast_set<ColumnId>(pair.second));
+ for (const auto& entry : _virtual_column_exprs) {
+
_columns_to_filter.push_back(_schema->column_index(entry.first));
}
}
}
@@ -2187,11 +2059,11 @@ Status
SegmentIterator::_vec_init_lazy_materialization() {
"_non_predicate_columns: [{}], "
"_cols_read_by_common_expr: [{}], "
"columns_to_filter: [{}], "
- "_schema_block_id_map: [{}]",
+ "schema_column_id_to_index: [{}]",
_lazy_materialization_read, _col_predicates.size(),
fmt::join(_predicate_column_ids, ","),
fmt::join(_non_predicate_columns, ","),
fmt::join(_common_expr_column_ids, ","),
fmt::join(_columns_to_filter, ","),
- fmt::join(_schema_block_id_map, ","));
+ fmt::join(_schema->column_id_to_index(), ","));
return Status::OK();
}
@@ -2226,31 +2098,47 @@ bool
SegmentIterator::_can_evaluated_by_vectorized(std::shared_ptr<ColumnPredica
}
}
-bool SegmentIterator::_prune_column(ColumnId cid, MutableColumnPtr& column,
bool fill_defaults,
+// These placeholders are used only when the real column data is skipped after
+// index/count pushdown has already identified the matching rows. The value is
+// irrelevant, but nullable columns must stay non-NULL so COUNT(col) can count
+// the matched rows instead of treating every placeholder as NULL.
+static void insert_many_not_null_defaults(MutableColumnPtr& column, size_t
num) {
+ if (auto* nullable_column =
check_and_get_column<ColumnNullable>(column.get())) {
+ nullable_column->insert_not_null_elements(num);
+ return;
+ }
+ column->insert_many_defaults(num);
+}
+
+bool SegmentIterator::_prune_column(ColumnId cid, MutableColumnPtr& column,
size_t num_of_defaults) {
if (_need_read_data(cid)) {
return false;
}
- if (!fill_defaults) {
- return true;
- }
- if (is_column_nullable(*column)) {
- auto nullable_col_ptr =
reinterpret_cast<ColumnNullable*>(column.get());
-
nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults);
-
nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults);
- } else {
- // assert(column->is_const());
- column->insert_many_defaults(num_of_defaults);
- }
+ insert_many_not_null_defaults(column, num_of_defaults);
return true;
}
+bool SegmentIterator::_can_skip_reading_extra_column(ColumnId cid) {
+ if (!_opts.extra_columns.contains(cid) || _is_pred_column.empty()) {
+ return false;
+ }
+ DCHECK_EQ(_is_pred_column.size(), _is_common_expr_column.size());
+ DCHECK_LT(cid, _is_pred_column.size());
+
+ // extra_columns is only an optimization hint. The real value is still
+ // required when the column participates in expression materialization or
+ // any predicate path.
+ return !_virtual_column_exprs.contains(cid) && !_has_delete_predicate(cid)
&&
+ !_is_pred_column[cid] && !_is_common_expr_column[cid];
+}
+
Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
MutableColumns& column_block, size_t
nrows) {
for (auto cid : column_ids) {
auto& column = column_block[cid];
size_t rows_read = nrows;
- if (_prune_column(cid, column, true, rows_read)) {
+ if (_prune_column(cid, column, rows_read)) {
continue;
}
RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read,
column));
@@ -2304,7 +2192,7 @@ Status SegmentIterator::_init_current_block(Block* block,
}
}
- for (auto entry : _virtual_column_exprs) {
+ for (const auto& entry : _virtual_column_exprs) {
auto cid = entry.first;
current_columns[cid] = ColumnNothing::create(0);
current_columns[cid]->reserve(nrows_read_limit);
@@ -2317,11 +2205,11 @@ Status SegmentIterator::_output_non_pred_columns(Block*
block) {
SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
VLOG_DEBUG << fmt::format(
"Output non-predicate columns, _non_predicate_columns: [{}], "
- "_schema_block_id_map: [{}]",
- fmt::join(_non_predicate_columns, ","),
fmt::join(_schema_block_id_map, ","));
+ "schema_column_id_to_index: [{}]",
+ fmt::join(_non_predicate_columns, ","),
fmt::join(_schema->column_id_to_index(), ","));
RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
for (auto cid : _non_predicate_columns) {
- auto loc = _schema_block_id_map[cid];
+ auto loc = _schema->column_index(cid);
// Whether a delete predicate column gets output depends on how the
caller builds
// the block passed to next_batch(). Both calling paths now build the
block with
// only the output schema (return_columns), so delete predicate
columns are skipped:
@@ -2338,7 +2226,7 @@ Status SegmentIterator::_output_non_pred_columns(Block*
block) {
if (loc < block->columns()) {
bool column_in_block_is_nothing = check_and_get_column<const
ColumnNothing>(
block->get_by_position(loc).column.get());
- bool column_is_normal = !_vir_cid_to_idx_in_block.contains(cid);
+ bool column_is_normal = !_virtual_column_exprs.contains(cid);
bool return_column_is_nothing =
check_and_get_column<const
ColumnNothing>(_current_return_columns[cid].get());
VLOG_DEBUG << fmt::format(
@@ -2408,7 +2296,7 @@ Status SegmentIterator::_read_columns_by_index(uint32_t
nrows_read_limit, uint16
VLOG_DEBUG << fmt::format("Column {} no need to read.", cid);
continue;
}
- if (_prune_column(cid, column, true, nrows_read)) {
+ if (_prune_column(cid, column, nrows_read)) {
VLOG_DEBUG << fmt::format("Column {} is pruned. No need to
read data.", cid);
continue;
}
@@ -2789,7 +2677,7 @@ Status
SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
if (_no_need_read_key_data(cid, colunm, select_size)) {
continue;
}
- if (_prune_column(cid, colunm, true, select_size)) {
+ if (_prune_column(cid, colunm, select_size)) {
continue;
}
@@ -2850,10 +2738,9 @@ Status SegmentIterator::next_batch(Block* block) {
if (res.is<END_OF_FILE>()) {
// Since we have a type check at the caller.
// So a replacement of nothing column with real column is
needed.
- const auto& idx_to_datatype = _opts.vir_col_idx_to_type;
- for (const auto& pair : _vir_cid_to_idx_in_block) {
- size_t idx = pair.second;
- auto type = idx_to_datatype.find(idx)->second;
+ for (const auto& [cid, expr_ctx] : _virtual_column_exprs) {
+ auto idx = _schema->column_index(cid);
+ auto type = expr_ctx->root()->data_type();
block->replace_by_position(idx, type->create_column());
}
@@ -3039,7 +2926,7 @@ Status SegmentIterator::_next_batch_internal(Block*
block) {
RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block));
}
- DCHECK(block->columns() >
_schema_block_id_map[*_common_expr_columns.begin()]);
+ DCHECK(block->columns() >
_schema->column_index(*_common_expr_columns.begin()));
RETURN_IF_ERROR(
_process_common_expr(_sel_rowid_idx.data(),
_selected_size, block));
}
@@ -3091,12 +2978,12 @@ Status SegmentIterator::_next_batch_internal(Block*
block) {
if (!_virtual_column_exprs.empty()) {
bool use_sel = _is_need_vec_eval || _is_need_short_eval ||
_is_need_expr_eval;
uint16_t* sel_rowid_idx = use_sel ? _sel_rowid_idx.data() : nullptr;
- std::vector<VExprContext*> vir_ctxs;
+ VExprContextSPtrs vir_ctxs;
vir_ctxs.reserve(_virtual_column_exprs.size());
for (auto& [cid, ctx] : _virtual_column_exprs) {
- vir_ctxs.push_back(ctx.get());
+ vir_ctxs.push_back(ctx);
}
- _output_index_result_column(vir_ctxs, sel_rowid_idx, _selected_size,
block);
+ _output_index_result_column(vir_ctxs, sel_rowid_idx, _selected_size);
}
RETURN_IF_ERROR(_materialization_of_virtual_column(block));
if (_opts.read_limit > 0) {
@@ -3108,7 +2995,7 @@ Status SegmentIterator::_next_batch_internal(Block*
block) {
Status SegmentIterator::_process_columns(const std::vector<ColumnId>&
column_ids, Block* block) {
RETURN_IF_ERROR(_convert_to_expected_type(column_ids));
for (auto cid : column_ids) {
- auto loc = _schema_block_id_map[cid];
+ auto loc = _schema->column_index(cid);
block->replace_by_position(loc,
std::move(_current_return_columns[cid]));
}
return Status::OK();
@@ -3119,12 +3006,10 @@ void SegmentIterator::_fill_column_nothing() {
// Because:
// 1. Before each batch, _init_return_columns is called to initialize
_current_return_columns, and virtual columns in _current_return_columns are
initialized as ColumnNothing.
// 2. When select_size == 0, the read method of VirtualColumnIterator will
definitely not be called, so the corresponding Column remains a ColumnNothing
- for (const auto pair : _vir_cid_to_idx_in_block) {
- auto cid = pair.first;
- auto pos = pair.second;
+ for (const auto& [cid, expr_ctx] : _virtual_column_exprs) {
[[maybe_unused]] const auto* nothing_col =
assert_cast<const
ColumnNothing*>(_current_return_columns[cid].get());
- _current_return_columns[cid] =
_opts.vir_col_idx_to_type[pos]->create_column();
+ _current_return_columns[cid] =
expr_ctx->root()->data_type()->create_column();
}
}
@@ -3140,17 +3025,15 @@ Status SegmentIterator::_check_output_block(Block*
block) {
idx, block->columns(), _schema->num_column_ids(),
_virtual_column_exprs.size());
} else if (check_and_get_column<ColumnNothing>(entry.column.get())) {
if (rows > 0) {
- std::vector<std::string> vcid_to_idx;
- for (const auto& pair : _vir_cid_to_idx_in_block) {
- vcid_to_idx.push_back(fmt::format("{}-{}", pair.first,
pair.second));
+ std::vector<ColumnId> virtual_column_ids;
+ for (const auto& pair : _virtual_column_exprs) {
+ virtual_column_ids.push_back(pair.first);
}
- std::string vir_cid_to_idx_in_block_msg =
- fmt::format("_vir_cid_to_idx_in_block:[{}]",
fmt::join(vcid_to_idx, ","));
return Status::InternalError(
"Column in idx {} is nothing, block columns {},
normal_columns {}, "
- "vir_cid_to_idx_in_block_msg {}",
+ "virtual_column_ids [{}]",
idx, block->columns(), _schema->num_column_ids(),
- vir_cid_to_idx_in_block_msg);
+ fmt::join(virtual_column_ids, ","));
}
} else if (entry.column->size() != rows) {
return Status::InternalError(
@@ -3165,10 +3048,6 @@ Status SegmentIterator::_check_output_block(Block*
block) {
return Status::OK();
}
-Status SegmentIterator::_process_column_predicate() {
- return Status::OK();
-}
-
Status SegmentIterator::_process_eof(Block* block) {
// Convert all columns in _current_return_columns to schema column
RETURN_IF_ERROR(_convert_to_expected_type(_schema->column_ids()));
@@ -3187,32 +3066,10 @@ Status SegmentIterator::_process_eof(Block* block) {
Status SegmentIterator::_process_common_expr(uint16_t* sel_rowid_idx,
uint16_t& selected_size,
Block* block) {
- // Here we just use col0 as row_number indicator. when reach here, we will
calculate the predicates first.
- // then use the result to reduce our data read(that is, expr push down).
there's now row in block means the first
- // column is not in common expr. so it's safe to replace it temporarily
to provide correct `selected_size`.
VLOG_DEBUG << fmt::format("Execute common expr. block rows {}, selected
size {}", block->rows(),
_selected_size);
- bool need_mock_col = block->rows() != selected_size;
- MutableColumnPtr col0;
- if (need_mock_col) {
- col0 = std::move(*block->get_by_position(0).column).mutate();
- block->replace_by_position(
- 0,
block->get_by_position(0).type->create_column_const_with_default_value(
- _selected_size));
- }
-
- std::vector<VExprContext*> common_ctxs;
- common_ctxs.reserve(_common_expr_ctxs_push_down.size());
- for (auto& ctx : _common_expr_ctxs_push_down) {
- common_ctxs.push_back(ctx.get());
- }
- _output_index_result_column(common_ctxs, _sel_rowid_idx.data(),
_selected_size, block);
- RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(),
_selected_size, block));
-
- if (need_mock_col) {
- block->replace_by_position(0, std::move(col0));
- }
+ RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block));
VLOG_DEBUG << fmt::format("Execute common expr end. block rows {},
selected size {}",
block->rows(), _selected_size);
@@ -3223,14 +3080,24 @@ Status SegmentIterator::_execute_common_expr(uint16_t*
sel_rowid_idx, uint16_t&
Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns);
DCHECK(!_common_expr_ctxs_push_down.empty());
- DCHECK(block->rows() != 0);
- int prev_columns = block->columns();
+ _output_index_result_column(_common_expr_ctxs_push_down, sel_rowid_idx,
selected_size);
+
uint16_t original_size = selected_size;
_opts.stats->expr_cond_input_rows += original_size;
- IColumn::Filter filter;
- RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
- _common_expr_ctxs_push_down, block, _columns_to_filter,
prev_columns, filter));
+ // Some output columns may stay empty until after common expr filtering.
Use the
+ // selected row count instead of Block::rows(), which is derived from the
first column.
+ IColumn::Filter filter(selected_size, 1);
+ bool can_filter_all = false;
+ auto* __restrict filter_data = filter.data();
+ for (const auto& expr_ctx : _common_expr_ctxs_push_down) {
+ RETURN_IF_ERROR(expr_ctx->execute_filter(block, filter_data,
selected_size, false,
+ &can_filter_all));
+ if (can_filter_all) {
+ break;
+ }
+ }
+ RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block,
_columns_to_filter, filter));
selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size,
filter);
_opts.stats->rows_expr_cond_filtered += original_size - selected_size;
@@ -3279,15 +3146,14 @@ uint16_t
SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx,
}
}
-void SegmentIterator::_output_index_result_column(const
std::vector<VExprContext*>& expr_ctxs,
- uint16_t* sel_rowid_idx,
uint16_t select_size,
- Block* block) {
+void SegmentIterator::_output_index_result_column(const VExprContextSPtrs&
expr_ctxs,
+ uint16_t* sel_rowid_idx,
uint16_t select_size) {
SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer);
- if (block->rows() == 0) {
+ if (select_size == 0) {
return;
}
- for (auto* expr_ctx_ptr : expr_ctxs) {
- auto index_ctx = expr_ctx_ptr->get_index_context();
+ for (const auto& expr_ctx : expr_ctxs) {
+ auto index_ctx = expr_ctx->get_index_context();
if (index_ctx == nullptr) {
continue;
}
@@ -3297,7 +3163,7 @@ void SegmentIterator::_output_index_result_column(const
std::vector<VExprContext
const auto& index_result_bitmap = result_bitmap.get_data_bitmap();
auto index_result_column = ColumnUInt8::create();
ColumnUInt8::Container& vec_match_pred =
index_result_column->get_data();
- vec_match_pred.resize(block->rows());
+ vec_match_pred.resize(select_size);
std::fill(vec_match_pred.begin(), vec_match_pred.end(), 0);
const auto& null_bitmap = result_bitmap.get_null_bitmap();
@@ -3309,7 +3175,7 @@ void SegmentIterator::_output_index_result_column(const
std::vector<VExprContext
if (has_null_bitmap && expr_returns_nullable) {
null_map_column = ColumnUInt8::create();
auto& null_map_vec = null_map_column->get_data();
- null_map_vec.resize(block->rows());
+ null_map_vec.resize(select_size);
std::fill(null_map_vec.begin(), null_map_vec.end(), 0);
null_map_data = &null_map_column->get_data();
}
@@ -3326,7 +3192,7 @@ void SegmentIterator::_output_index_result_column(const
std::vector<VExprContext
}
}
- DCHECK(block->rows() == vec_match_pred.size());
+ DCHECK(select_size == vec_match_pred.size());
if (null_map_column) {
index_ctx->set_index_result_column_for_expr(
@@ -3414,8 +3280,6 @@ Status
SegmentIterator::_construct_compound_expr_context() {
context->set_index_context(inverted_index_context);
expr_ctx = context;
}
- RETURN_IF_ERROR(rebind_storage_exprs_to_reader_schema(
- _opts, *_schema, _common_expr_ctxs_push_down,
_virtual_column_exprs));
return Status::OK();
}
@@ -3547,8 +3411,8 @@ void
SegmentIterator::_calculate_common_expr_index_exec_status() {
for (const auto& vir_child : vir_node->children())
{
if (vir_child->is_slot_ref()) {
auto* inner_slot_ref =
assert_cast<VSlotRef*>(vir_child.get());
-
_common_expr_index_exec_status[_schema->column_id(
-
inner_slot_ref->column_id())][expr.get()] = false;
+ auto cid =
_schema->column_id(inner_slot_ref->column_id());
+
_common_expr_index_exec_status[cid][expr.get()] = false;
_common_expr_to_slotref_map[root_expr_ctx.get()]
[inner_slot_ref->column_id()] =
expr.get();
@@ -3565,8 +3429,8 @@ void
SegmentIterator::_calculate_common_expr_index_exec_status() {
auto expr_without_cast = VExpr::expr_without_cast(child);
if (expr_without_cast->is_slot_ref() && expr->op() !=
TExprOpcode::CAST) {
auto* column_slot_ref =
assert_cast<VSlotRef*>(expr_without_cast.get());
-
_common_expr_index_exec_status[_schema->column_id(column_slot_ref->column_id())]
- [expr.get()] = false;
+ auto cid =
_schema->column_id(column_slot_ref->column_id());
+ _common_expr_index_exec_status[cid][expr.get()] = false;
_common_expr_to_slotref_map[root_expr_ctx.get()][column_slot_ref->column_id()] =
expr.get();
}
@@ -3610,14 +3474,7 @@ bool SegmentIterator::_no_need_read_key_data(ColumnId
cid, MutableColumnPtr& col
return false;
}
- if (is_column_nullable(*column)) {
- auto* nullable_col_ptr =
reinterpret_cast<ColumnNullable*>(column.get());
-
nullable_col_ptr->get_null_map_column().insert_many_defaults(nrows_read);
-
nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(nrows_read);
- } else {
- column->insert_many_defaults(nrows_read);
- }
-
+ insert_many_not_null_defaults(column, nrows_read);
return true;
}
@@ -3668,55 +3525,46 @@ bool SegmentIterator::_can_opt_limit_reads() {
// Before get next batch. make sure all virtual columns in block has type
ColumnNothing.
void SegmentIterator::_init_virtual_columns(Block* block) {
- for (const auto& pair : _vir_cid_to_idx_in_block) {
- auto& col_with_type_and_name = block->get_by_position(pair.second);
+ for (const auto& [cid, expr_ctx] : _virtual_column_exprs) {
+ auto idx = _schema->column_index(cid);
+ auto& col_with_type_and_name = block->get_by_position(idx);
col_with_type_and_name.column = ColumnNothing::create(0);
- col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second];
+ col_with_type_and_name.type = expr_ctx->root()->data_type();
}
}
Status SegmentIterator::_materialization_of_virtual_column(Block* block) {
// Some expr can not process empty block, such as function `element_at`.
// So materialize virtual column in advance to avoid errors.
- if (block->rows() == 0) {
- for (const auto& pair : _vir_cid_to_idx_in_block) {
- auto& col_with_type_and_name = block->get_by_position(pair.second);
- col_with_type_and_name.column =
_opts.vir_col_idx_to_type[pair.second]->create_column();
- col_with_type_and_name.type =
_opts.vir_col_idx_to_type[pair.second];
+ if (_selected_size == 0) {
+ for (const auto& [cid, expr_ctx] : _virtual_column_exprs) {
+ auto idx = _schema->column_index(cid);
+ auto& col_with_type_and_name = block->get_by_position(idx);
+ col_with_type_and_name.column =
expr_ctx->root()->data_type()->create_column();
+ col_with_type_and_name.type = expr_ctx->root()->data_type();
}
return Status::OK();
}
+ if (_virtual_column_exprs.empty()) {
+ return Status::OK();
+ }
for (const auto& cid_and_expr : _virtual_column_exprs) {
auto cid = cid_and_expr.first;
auto column_expr = cid_and_expr.second;
- size_t idx_in_block = _vir_cid_to_idx_in_block[cid];
- if (block->columns() <= idx_in_block) {
- return Status::InternalError(
- "Virtual column index {} is out of range, block columns
{}, "
- "virtual columns size {}, virtual column expr {}",
- idx_in_block, block->columns(),
_vir_cid_to_idx_in_block.size(),
- column_expr->root()->debug_string());
- } else if (block->get_by_position(idx_in_block).column.get() ==
nullptr) {
- return Status::InternalError(
- "Virtual column index {} is null, block columns {},
virtual columns size "
- "{}, "
- "virtual column expr {}",
- idx_in_block, block->columns(),
_vir_cid_to_idx_in_block.size(),
- column_expr->root()->debug_string());
- }
- if (check_and_get_column<const ColumnNothing>(
- block->get_by_position(idx_in_block).column.get())) {
+ auto materialized_pos = _schema->column_index(cid);
+ auto& column = block->get_by_position(materialized_pos).column;
+ if (check_and_get_column<const ColumnNothing>(column.get())) {
VLOG_DEBUG << fmt::format("Virtual column is doing
materialization, cid {}, col idx {}",
- cid, idx_in_block);
+ cid, materialized_pos);
ColumnPtr result_column;
- RETURN_IF_ERROR(column_expr->execute(block, result_column));
+ // The first block column may still be ColumnNothing(0) for a
virtual column, while
+ // predicates have already reduced _selected_size. Evaluate the
expression over the
+ // selected row count instead of Block::rows().
+
RETURN_IF_ERROR(column_expr->root()->execute_column(column_expr.get(), block,
nullptr,
+
_selected_size, result_column));
- block->replace_by_position(idx_in_block, std::move(result_column));
- if (block->get_by_position(idx_in_block).column->size() == 0) {
- LOG_WARNING("Result of expr column {} is empty. cid {},
idx_in_block {}",
- column_expr->root()->debug_string(), cid,
idx_in_block);
- }
+ block->replace_by_position(materialized_pos,
std::move(result_column));
}
}
return Status::OK();
diff --git a/be/src/storage/segment/segment_iterator.h
b/be/src/storage/segment/segment_iterator.h
index da6b5b870ab..c6ae5a47ae5 100644
--- a/be/src/storage/segment/segment_iterator.h
+++ b/be/src/storage/segment/segment_iterator.h
@@ -18,9 +18,9 @@
#pragma once
#include <gen_cpp/Exprs_types.h>
-#include <stddef.h>
-#include <stdint.h>
+#include <cstddef>
+#include <cstdint>
#include <map>
#include <memory>
#include <ostream>
@@ -241,7 +241,7 @@ private:
uint16_t* sel_rowid_idx,
uint16_t select_size) {
SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
for (auto cid : column_ids) {
- int block_cid = _schema_block_id_map[cid];
+ int block_cid = _schema->column_index(cid);
// Only the additional deleted filter condition need to
materialize column be at the end of the block
// We should not to materialize the column of query engine do not
need. So here just return OK.
// Eg:
@@ -276,7 +276,6 @@ private:
bool _can_evaluated_by_vectorized(std::shared_ptr<ColumnPredicate>
predicate);
[[nodiscard]] Status _extract_common_expr_columns(const VExprSPtr& expr);
- // same with _extract_common_expr_columns, but only extract columns that
can be used for index
[[nodiscard]] Status _execute_common_expr(uint16_t* sel_rowid_idx,
uint16_t& selected_size,
Block* block);
Status _process_common_expr(uint16_t* sel_rowid_idx, uint16_t&
selected_size, Block* block);
@@ -292,12 +291,11 @@ private:
bool _check_apply_by_inverted_index(std::shared_ptr<ColumnPredicate> pred);
- void _output_index_result_column(const std::vector<VExprContext*>&
expr_ctxs,
- uint16_t* sel_rowid_idx, uint16_t
select_size, Block* block);
+ void _output_index_result_column(const VExprContextSPtrs& expr_ctxs,
uint16_t* sel_rowid_idx,
+ uint16_t select_size);
bool _need_read_data(ColumnId cid);
- bool _prune_column(ColumnId cid, MutableColumnPtr& column, bool
fill_defaults,
- size_t num_of_defaults);
+ bool _prune_column(ColumnId cid, MutableColumnPtr& column, size_t
num_of_defaults);
Status _construct_compound_expr_context();
@@ -306,7 +304,7 @@ private:
for (auto cid : col_ids) {
auto ord = key.field(cid) <=> (*_seek_block[cid])[0];
if (ord != std::strong_ordering::equal) {
- return ord < 0 ? -1 : 1;
+ return ord == std::strong_ordering::less ? -1 : 1;
}
}
return 0;
@@ -317,6 +315,7 @@ private:
bool _no_need_read_key_data(ColumnId cid, MutableColumnPtr& column, size_t
nrows_read);
bool _has_delete_predicate(ColumnId cid);
+ bool _can_skip_reading_extra_column(ColumnId cid);
bool _can_opt_limit_reads();
@@ -328,8 +327,6 @@ private:
Status _process_eof(Block* block);
- Status _process_column_predicate();
-
void _fill_column_nothing();
Status _process_columns(const std::vector<ColumnId>& column_ids, Block*
block);
@@ -394,11 +391,11 @@ private:
// so we need a field to stand for columns first time to read
std::vector<ColumnId> _predicate_column_ids;
std::vector<ColumnId> _common_expr_column_ids;
- // TODO: Should use std::vector<size_t>
+ // Block slot indexes to filter after common expr evaluation. This is not
+ // tablet column ids because Block::filter_block_internal filters by block
+ // position.
std::vector<ColumnId> _columns_to_filter;
std::vector<bool> _converted_column_ids;
- // TODO: Should use std::vector<size_t>
- std::vector<int> _schema_block_id_map; // map from schema column id to
column idx in Block
// the actual init process is delayed to the first call to next_batch()
bool _lazy_inited;
@@ -473,7 +470,6 @@ private:
// cid to virtual column expr
std::map<ColumnId, VExprContextSPtr> _virtual_column_exprs;
- std::map<ColumnId, size_t> _vir_cid_to_idx_in_block;
IndexQueryContextPtr _index_query_context;
diff --git a/be/src/storage/tablet/tablet_reader.cpp
b/be/src/storage/tablet/tablet_reader.cpp
index 2e7e73a3676..4ae7945c688 100644
--- a/be/src/storage/tablet/tablet_reader.cpp
+++ b/be/src/storage/tablet/tablet_reader.cpp
@@ -204,14 +204,13 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params) {
_reader_context.is_key_column_group = read_params.is_key_column_group;
_reader_context.common_expr_ctxs_push_down =
read_params.common_expr_ctxs_push_down;
_reader_context.output_columns = &read_params.output_columns;
+ _reader_context.extra_columns = read_params.extra_columns;
_reader_context.push_down_agg_type_opt =
read_params.push_down_agg_type_opt;
_reader_context.ttl_seconds = _tablet->ttl_seconds();
_reader_context.score_runtime = read_params.score_runtime;
_reader_context.collection_statistics = read_params.collection_statistics;
_reader_context.virtual_column_exprs = read_params.virtual_column_exprs;
- _reader_context.vir_cid_to_idx_in_block =
read_params.vir_cid_to_idx_in_block;
- _reader_context.vir_col_idx_to_type = read_params.vir_col_idx_to_type;
_reader_context.ann_topn_runtime = read_params.ann_topn_runtime;
_reader_context.condition_cache_digest =
read_params.condition_cache_digest;
@@ -229,10 +228,6 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params) {
// Propagate general read limit for DUP_KEYS and UNIQUE_KEYS with MOW
_reader_context.general_read_limit = read_params.general_read_limit;
- // Preserve the original requested output layout so BlockReader can map
expanded storage
- // columns (for non-direct AGG/UNIQUE paths) back to the final output
block.
- _reader_context.origin_return_columns = read_params.origin_return_columns;
-
return Status::OK();
}
diff --git a/be/src/storage/tablet/tablet_reader.h
b/be/src/storage/tablet/tablet_reader.h
index 3d03caf6d74..4eaeaa5f6c5 100644
--- a/be/src/storage/tablet/tablet_reader.h
+++ b/be/src/storage/tablet/tablet_reader.h
@@ -158,6 +158,12 @@ public:
std::vector<ColumnId> return_columns;
// output_columns only contain columns in OrderByExprs and outputExprs
std::set<int32_t> output_columns;
+ // Extra storage key columns that are present only for scan-schema
alignment.
+ // Example: for AGG keys (k1, k2), a query that returns k2 can scan
+ // (k1, k2) and project away k1. Direct readers may avoid reading such
+ // columns only if the lower iterator proves their real values are not
+ // required by predicates, delete conditions, or expressions.
+ std::set<ColumnId> extra_columns;
RuntimeProfile* profile = nullptr;
RuntimeState* runtime_state = nullptr;
@@ -209,8 +215,6 @@ public:
int64_t batch_size = -1;
std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
- std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
- std::map<size_t, DataTypePtr> vir_col_idx_to_type;
std::shared_ptr<ScoreRuntime> score_runtime;
CollectionStatisticsPtr collection_statistics;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 14e063ac7f2..1c037c65edb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -36,6 +36,7 @@ import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.AliasFunction;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.RowBinlogTableWrapper;
import org.apache.doris.catalog.TableIf;
@@ -3002,6 +3003,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
.collect(Collectors.toSet());
requiredWithVirtualColumns.addAll(virtualColumnInputSlotIds);
}
+ if (scanNode instanceof OlapScanNode) {
+ preserveExtraStorageKeySlots((OlapScanNode) scanNode,
requiredWithVirtualColumns);
+ }
// Find the smallest column, for count(*) or other situation that slot
is empty after prune
SlotDescriptor smallest =
getSmallestSlot(scanNode.getTupleDesc().getSlots());
scanNode.getTupleDesc().getSlots().removeIf(s ->
!requiredWithVirtualColumns.contains(s.getId()));
@@ -3021,6 +3025,35 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
}
+ private void preserveExtraStorageKeySlots(OlapScanNode scanNode,
Set<SlotId> requiredSlotIds) {
+ if (!shouldPreserveStorageKeySlots(scanNode)) {
+ return;
+ }
+ for (SlotDescriptor slot : scanNode.getTupleDesc().getSlots()) {
+ Column column = slot.getColumn();
+ if (column == null || !column.isKey()) {
+ // OLAP scan tuples follow the storage schema, where key
columns form the prefix.
+ break;
+ }
+ if (!requiredSlotIds.contains(slot.getId())) {
+ scanNode.getExtraKeyColumnSlotIds().add(slot.getId().asInt());
+ requiredSlotIds.add(slot.getId());
+ }
+ }
+ }
+
+ private boolean shouldPreserveStorageKeySlots(OlapScanNode scanNode) {
+ if (scanNode.isIncrementalScan()) {
+ return false;
+ }
+ long selectedIndexId = scanNode.getSelectedIndexId() == -1
+ ? scanNode.getOlapTable().getBaseIndexId()
+ : scanNode.getSelectedIndexId();
+ KeysType keysType =
scanNode.getOlapTable().getIndexMetaByIndexId(selectedIndexId).getKeysType();
+ return keysType == KeysType.AGG_KEYS
+ || (keysType == KeysType.UNIQUE_KEYS &&
!scanNode.getOlapTable().getEnableUniqueKeyMergeOnWrite());
+ }
+
/**
* Get the smallest slot in the slot list.
*
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
index d60ff2d5e34..d65ddcd5c6f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
@@ -436,9 +436,8 @@ public class PhysicalOlapScan extends
PhysicalCatalogRelation implements OlapSca
selectedIndexId, selectedTabletIds, selectedPartitionIds,
hasPartitionPredicate,
distributionSpec, preAggStatus, baseOutputs, groupExpression,
getLogicalProperties(),
getPhysicalProperties(), statistics, tableSample,
operativeSlots, virtualColumns, scoreOrderKeys,
- scoreLimit,
- scoreRangeInfo, annOrderKeys, annLimit, tableAlias,
partitionPrunablePredicates, incrementalScan,
- scanParams));
+ scoreLimit, scoreRangeInfo, annOrderKeys, annLimit,
tableAlias, partitionPrunablePredicates,
+ incrementalScan, scanParams));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 2f93e50d3d5..1f2c7e97746 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -189,6 +189,7 @@ public class OlapScanNode extends ScanNode {
private SortInfo sortInfo = null;
private Set<Integer> outputColumnUniqueIds = new HashSet<>();
+ private Set<Integer> extraKeyColumnSlotIds = new HashSet<>();
// When scan match sort_info, we can push limit into OlapScanNode.
// It's limit for scanner instead of scanNode so we add a new limit.
@@ -269,6 +270,10 @@ public class OlapScanNode extends ScanNode {
this.tableSample = tSample;
}
+ public Set<Integer> getExtraKeyColumnSlotIds() {
+ return extraKeyColumnSlotIds;
+ }
+
public void setNereidsPrunedTabletIds(Set<Long> nereidsPrunedTabletIds) {
this.nereidsPrunedTabletIds = nereidsPrunedTabletIds;
}
@@ -1124,6 +1129,12 @@ public class OlapScanNode extends ScanNode {
.map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
output.append(prefix).append("TOPN
OPT:").append(topnFilterSources).append("\n");
}
+ if (!extraKeyColumnSlotIds.isEmpty()) {
+ String extraKeyColumns = extraKeyColumnSlotIds.stream().sorted()
+ .map(this::getExtraKeyColumnExplainName)
+ .collect(Collectors.joining(","));
+ output.append(prefix).append("EXTRA KEY COLUMNS:
").append(extraKeyColumns).append("\n");
+ }
if (!conjuncts.isEmpty()) {
Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
@@ -1164,6 +1175,17 @@ public class OlapScanNode extends ScanNode {
return output.toString();
}
+ private String getExtraKeyColumnExplainName(Integer slotId) {
+ SlotDescriptor extraKeySlot = desc.getSlots().stream()
+ .filter(slot -> slot.getId().asInt() == slotId)
+ .findFirst()
+ .orElse(null);
+ Preconditions.checkNotNull(extraKeySlot, "missing extra key slot %s",
slotId);
+ Column column = extraKeySlot.getColumn();
+ Preconditions.checkNotNull(column, "missing column for extra key slot
%s", slotId);
+ return column.getName();
+ }
+
@Override
public int getNumInstances() {
// In pipeline exec engine, the instance num equals be_num * parallel
instance.
@@ -1318,6 +1340,9 @@ public class OlapScanNode extends ScanNode {
if (outputColumnUniqueIds != null) {
msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds);
}
+ if (!extraKeyColumnSlotIds.isEmpty()) {
+ msg.olap_scan_node.setExtraKeyColumnSlotIds(extraKeyColumnSlotIds);
+ }
msg.olap_scan_node.setDistributeColumnIds(new
ArrayList<>(distributionColumnIds));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
index c31f96792f2..a6617e595ba 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
@@ -55,6 +55,8 @@ import org.apache.doris.planner.Planner;
import org.apache.doris.planner.RepeatNode;
import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.utframe.TestWithFeService;
@@ -101,13 +103,11 @@ public class PhysicalPlanTranslatorTest extends
TestWithFeService {
OlapTable t1 = PlanConstructor.newOlapTable(0, "t1", 0,
KeysType.AGG_KEYS);
List<String> qualifier = new ArrayList<>();
qualifier.add("test");
- List<Slot> t1Output = new ArrayList<>();
- SlotReference col1 = new SlotReference("col1", IntegerType.INSTANCE);
- SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE);
- SlotReference col3 = new SlotReference("col2", IntegerType.INSTANCE);
- t1Output.add(col1);
- t1Output.add(col2);
- t1Output.add(col3);
+ SlotReference col1 =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(),
+ t1, t1.getBaseSchema().get(0), qualifier);
+ SlotReference col2 =
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(),
+ t1, t1.getBaseSchema().get(1), qualifier);
+ List<Slot> t1Output = ImmutableList.of(col1, col2);
LogicalProperties t1Properties = new LogicalProperties(new
Supplier<List<Slot>>() {
@Override
public List<Slot> get() {
@@ -119,11 +119,11 @@ public class PhysicalPlanTranslatorTest extends
TestWithFeService {
return DataTrait.EMPTY_TRAIT;
}
});
- PhysicalOlapScan scan = new
PhysicalOlapScan(StatementScopeIdGenerator.newRelationId(), t1, qualifier,
t1.getBaseIndexId(),
- Collections.emptyList(), Collections.emptyList(), null,
PreAggStatus.on(),
- ImmutableList.of(), Optional.empty(), t1Properties,
Optional.empty(),
- ImmutableList.of(), ImmutableList.of(), ImmutableList.of(),
Optional.empty(),
- Optional.empty(), ImmutableList.of(), Optional.empty());
+ PhysicalOlapScan scan = new
PhysicalOlapScan(StatementScopeIdGenerator.newRelationId(), t1, qualifier,
+ t1.getBaseIndexId(), Collections.emptyList(),
Collections.emptyList(), null, PreAggStatus.on(),
+ ImmutableList.of(), Optional.empty(), t1Properties,
Optional.empty(), ImmutableList.of(),
+ ImmutableList.of(), ImmutableList.of(), Optional.empty(),
Optional.empty(), ImmutableList.of(),
+ Optional.empty());
Literal t1FilterRight = new IntegerLiteral(1);
Expression t1FilterExpr = new GreaterThan(col1, t1FilterRight);
PhysicalFilter<PhysicalOlapScan> filter =
@@ -139,6 +139,34 @@ public class PhysicalPlanTranslatorTest extends
TestWithFeService {
List<OlapScanNode> scanNodeList = new ArrayList<>();
planNode.collect(OlapScanNode.class, scanNodeList);
Assertions.assertEquals(2,
scanNodeList.get(0).getTupleDesc().getSlots().size());
+
Assertions.assertTrue(scanNodeList.get(0).getExtraKeyColumnSlotIds().isEmpty());
+
+ List<NamedExpression> extraKeyProjectList = new ArrayList<>();
+ extraKeyProjectList.add(col2);
+ PhysicalProject<PhysicalOlapScan> extraKeyProject = new
PhysicalProject<>(extraKeyProjectList,
+ placeHolder, scan);
+ PlanTranslatorContext extraKeyPlanTranslatorContext = new
PlanTranslatorContext();
+ PhysicalPlanTranslator extraKeyTranslator = new
PhysicalPlanTranslator(extraKeyPlanTranslatorContext, null);
+ PlanFragment extraKeyFragment =
extraKeyTranslator.visitPhysicalProject(extraKeyProject,
+ extraKeyPlanTranslatorContext);
+ List<OlapScanNode> extraKeyScanNodeList = new ArrayList<>();
+ extraKeyFragment.getPlanRoot().collect(OlapScanNode.class,
extraKeyScanNodeList);
+ OlapScanNode extraKeyScanNode = extraKeyScanNodeList.get(0);
+ Assertions.assertEquals(2,
extraKeyScanNode.getTupleDesc().getSlots().size());
+ Assertions.assertEquals("id",
extraKeyScanNode.getTupleDesc().getSlots().get(0).getColumn().getName());
+ Assertions.assertEquals("name",
extraKeyScanNode.getTupleDesc().getSlots().get(1).getColumn().getName());
+ Assertions.assertEquals(1,
extraKeyScanNode.getOutputTupleDesc().getSlots().size());
+ Assertions.assertEquals("name",
extraKeyScanNode.getOutputTupleDesc().getSlots().get(0).getColumn().getName());
+ Assertions.assertEquals(1, extraKeyScanNode.getProjectList().size());
+
+ int extraKeySlotId =
extraKeyScanNode.getTupleDesc().getSlots().get(0).getId().asInt();
+ Assertions.assertEquals(ImmutableSet.of(extraKeySlotId),
extraKeyScanNode.getExtraKeyColumnSlotIds());
+ Assertions.assertTrue(extraKeyScanNode.getNodeExplainString("",
TExplainLevel.NORMAL)
+ .contains("EXTRA KEY COLUMNS: id"));
+ TPlanNode thriftScanNode =
extraKeyScanNode.treeToThrift().getNodes().get(0);
+
Assertions.assertTrue(thriftScanNode.olap_scan_node.isSetExtraKeyColumnSlotIds());
+ Assertions.assertEquals(ImmutableSet.of(extraKeySlotId),
+ thriftScanNode.olap_scan_node.getExtraKeyColumnSlotIds());
}
@Test
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index fb8ef30150e..5079fb2b710 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1008,6 +1008,8 @@ struct TOlapScanNode {
// Only partitions that are candidates for pruning are included; partitions
FE
// does not want pruned (e.g. default catch-all) are omitted from this list.
27: optional list<TPartitionBoundary> partition_boundaries
+ // Slot ids of extra storage key columns used only to align the scan tuple
with storage schema.
+ 28: optional set<i32> extra_key_column_slot_ids
}
struct TEqJoinCondition {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]