This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new ac24a827685 [fix](be) Fill partition columns in TableReader (#63773)
ac24a827685 is described below

commit ac24a827685562d3adb877ccd2082ceb719e642e
Author: Gabriel <[email protected]>
AuthorDate: Thu May 28 11:11:00 2026 +0800

    [fix](be) Fill partition columns in TableReader (#63773)
    
    Problem Summary: TableReader could map partition columns to physical
    file columns before checking split partition values, and
    constant/default expression materialization used the file-local block
    row count. For scans where partition values should be filled from split
    metadata, especially when the file-local block row count differs from
    the batch row count, this could produce incorrect materialized columns.
---
 .../data_type_serde/data_type_datetimev2_serde.cpp |   3 +-
 .../data_type_serde/data_type_datev2_serde.cpp     |   7 +-
 .../data_type_serde/data_type_decimal_serde.cpp    |  14 +-
 .../data_type_serde/data_type_nullable_serde.cpp   |   6 +-
 .../data_type_serde/data_type_number_serde.cpp     |  10 +-
 .../data_type_serde/data_type_string_serde.cpp     |   2 +-
 .../core/data_type_serde/data_type_time_serde.cpp  |  20 +-
 be/src/format/new_parquet/parquet_reader.cpp       |   6 +-
 be/src/format/reader/column_mapper.cpp             |  14 +-
 be/src/format/reader/column_mapper.h               |   3 +-
 be/src/format/reader/table_reader.h                |  17 +-
 .../data_type_serde_decoded_values_test.cpp        |   3 +-
 be/test/format/new_parquet/parquet_reader_test.cpp |   4 +-
 be/test/format/reader/expr/cast_test.cpp           |  10 +-
 be/test/format/reader/table_reader_test.cpp        | 278 ++++++++++++---------
 15 files changed, 225 insertions(+), 172 deletions(-)

diff --git a/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp 
b/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
index fc2c14d1829..ce0599080c6 100644
--- a/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
@@ -22,12 +22,13 @@
 
 #include <chrono> // IWYU pragma: keep
 #include <cstdint>
+
 #include "common/status.h"
 #include "core/column/column_const.h"
-#include "core/data_type_serde/decoded_column_view.h"
 #include "core/data_type/data_type_decimal.h"
 #include "core/data_type/data_type_number.h"
 #include "core/data_type/primitive_type.h"
+#include "core/data_type_serde/decoded_column_view.h"
 #include "core/types.h"
 #include "core/value/vdatetime_value.h"
 #include "exprs/function/cast/cast_to_datetimev2_impl.hpp"
diff --git a/be/src/core/data_type_serde/data_type_datev2_serde.cpp 
b/be/src/core/data_type_serde/data_type_datev2_serde.cpp
index 9410df86eaa..94b86312d61 100644
--- a/be/src/core/data_type_serde/data_type_datev2_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_datev2_serde.cpp
@@ -22,11 +22,12 @@
 #include <fmt/core.h>
 
 #include <cstdint>
+
 #include "core/column/column_const.h"
-#include "core/data_type_serde/decoded_column_view.h"
 #include "core/data_type/data_type_decimal.h"
 #include "core/data_type/data_type_number.h"
 #include "core/data_type/define_primitive_type.h"
+#include "core/data_type_serde/decoded_column_view.h"
 #include "core/types.h"
 #include "core/value/vdatetime_value.h"
 #include "exprs/function/cast/cast_to_datev2_impl.hpp"
@@ -125,8 +126,8 @@ Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn& 
column, const arrow:
     return Status::OK();
 }
 
-Status DataTypeDateV2SerDe::read_column_from_decoded_values(
-        IColumn& column, const DecodedColumnView& view) const {
+Status DataTypeDateV2SerDe::read_column_from_decoded_values(IColumn& column,
+                                                            const 
DecodedColumnView& view) const {
     if (view.value_kind != DecodedValueKind::INT32) {
         return Status::NotSupported("DATEV2 decoded reader expects INT32 
source");
     }
diff --git a/be/src/core/data_type_serde/data_type_decimal_serde.cpp 
b/be/src/core/data_type_serde/data_type_decimal_serde.cpp
index c744cdb8a2f..9fa2e0c6ebd 100644
--- a/be/src/core/data_type_serde/data_type_decimal_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_decimal_serde.cpp
@@ -61,8 +61,8 @@ NativeType decode_big_endian_signed_integer(const uint8_t* 
data, int length) {
 }
 
 template <PrimitiveType T>
-typename PrimitiveTypeTraits<T>::CppType read_decimal_decoded_value(
-        const DecodedColumnView& view, int64_t row) {
+typename PrimitiveTypeTraits<T>::CppType read_decimal_decoded_value(const 
DecodedColumnView& view,
+                                                                    int64_t 
row) {
     using FieldType = typename PrimitiveTypeTraits<T>::CppType;
     if (view.value_kind == DecodedValueKind::INT32) {
         const auto* values = reinterpret_cast<const int32_t*>(view.values);
@@ -76,9 +76,9 @@ typename PrimitiveTypeTraits<T>::CppType 
read_decimal_decoded_value(
     const auto length = view.value_kind == DecodedValueKind::FIXED_BINARY
                                 ? view.fixed_length
                                 : cast_set<int, size_t, false>(value.size);
-    return FieldType {static_cast<typename FieldType::NativeType>(
-            decode_big_endian_signed_integer<Int128>(reinterpret_cast<const 
uint8_t*>(value.data),
-                                                     length))};
+    return FieldType {
+            static_cast<typename 
FieldType::NativeType>(decode_big_endian_signed_integer<Int128>(
+                    reinterpret_cast<const uint8_t*>(value.data), length))};
 }
 
 template <PrimitiveType T>
@@ -441,8 +441,8 @@ Status 
DataTypeDecimalSerDe<T>::read_column_from_decoded_values(
             return read_decimal_decoded_values<T>(column, view);
         }
     }
-    return Status::NotSupported("Unsupported decoded values for {} from source 
kind {}",
-                                get_name(), static_cast<int>(view.value_kind));
+    return Status::NotSupported("Unsupported decoded values for {} from source 
kind {}", get_name(),
+                                static_cast<int>(view.value_kind));
 }
 
 template <PrimitiveType T>
diff --git a/be/src/core/data_type_serde/data_type_nullable_serde.cpp 
b/be/src/core/data_type_serde/data_type_nullable_serde.cpp
index 6b15b29c63a..b02c8606332 100644
--- a/be/src/core/data_type_serde/data_type_nullable_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_nullable_serde.cpp
@@ -29,9 +29,9 @@
 #include "core/column/column_const.h"
 #include "core/column/column_nullable.h"
 #include "core/column/column_vector.h"
-#include "core/data_type_serde/decoded_column_view.h"
 #include "core/data_type_serde/data_type_serde.h"
 #include "core/data_type_serde/data_type_string_serde.h"
+#include "core/data_type_serde/decoded_column_view.h"
 #include "exprs/function/cast/cast_base.h"
 #include "format/transformer/vcsv_transformer.h"
 #include "util/jsonb_document.h"
@@ -351,8 +351,8 @@ Status 
DataTypeNullableSerDe::read_column_from_arrow(IColumn& column,
                                                 ctz);
 }
 
-Status DataTypeNullableSerDe::read_column_from_decoded_values(
-        IColumn& column, const DecodedColumnView& view) const {
+Status DataTypeNullableSerDe::read_column_from_decoded_values(IColumn& column,
+                                                              const 
DecodedColumnView& view) const {
     auto& nullable_column = assert_cast<ColumnNullable&>(column);
     auto& null_map = nullable_column.get_null_map_data();
     const auto old_size = null_map.size();
diff --git a/be/src/core/data_type_serde/data_type_number_serde.cpp 
b/be/src/core/data_type_serde/data_type_number_serde.cpp
index 131e6d05941..6cd30449083 100644
--- a/be/src/core/data_type_serde/data_type_number_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_number_serde.cpp
@@ -26,8 +26,8 @@
 #include "core/column/column_nullable.h"
 #include "core/data_type/define_primitive_type.h"
 #include "core/data_type/primitive_type.h"
-#include "core/data_type_serde/decoded_column_view.h"
 #include "core/data_type_serde/data_type_serde.h"
+#include "core/data_type_serde/decoded_column_view.h"
 #include "core/packed_int128.h"
 #include "core/types.h"
 #include "core/value/timestamptz_value.h"
@@ -55,8 +55,8 @@ Status read_number_decoded_values(IColumn& column, const 
DecodedColumnView& view
     if (view.values == nullptr && view.row_count > 0) {
         return Status::Corruption("Decoded value buffer is null for {}", 
column.get_name());
     }
-    auto& data = assert_cast<typename 
PrimitiveTypeTraits<DorisType>::ColumnType&>(column)
-                         .get_data();
+    auto& data =
+            assert_cast<typename 
PrimitiveTypeTraits<DorisType>::ColumnType&>(column).get_data();
     const auto* values = decoded_values_as<SourceType>(view);
     for (int64_t row = 0; row < view.row_count; ++row) {
         using DorisCppType = typename PrimitiveTypeTraits<DorisType>::CppType;
@@ -204,8 +204,8 @@ Status 
DataTypeNumberSerDe<T>::read_column_from_decoded_values(
             return read_number_decoded_values<TYPE_DOUBLE, double>(column, 
view);
         }
     }
-    return Status::NotSupported("Unsupported decoded values for {} from source 
kind {}",
-                                get_name(), static_cast<int>(view.value_kind));
+    return Status::NotSupported("Unsupported decoded values for {} from source 
kind {}", get_name(),
+                                static_cast<int>(view.value_kind));
 }
 
 template <PrimitiveType T>
diff --git a/be/src/core/data_type_serde/data_type_string_serde.cpp 
b/be/src/core/data_type_serde/data_type_string_serde.cpp
index 478cdf3b5e6..0a9a5cd7dab 100644
--- a/be/src/core/data_type_serde/data_type_string_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_string_serde.cpp
@@ -18,8 +18,8 @@
 #include "core/data_type_serde/data_type_string_serde.h"
 
 #include "core/column/column_string.h"
-#include "core/data_type_serde/decoded_column_view.h"
 #include "core/data_type/define_primitive_type.h"
+#include "core/data_type_serde/decoded_column_view.h"
 #include "util/jsonb_document_cast.h"
 #include "util/jsonb_utils.h"
 #include "util/jsonb_writer.h"
diff --git a/be/src/core/data_type_serde/data_type_time_serde.cpp 
b/be/src/core/data_type_serde/data_type_time_serde.cpp
index 65e1afa577d..a40a8d217c9 100644
--- a/be/src/core/data_type_serde/data_type_time_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_time_serde.cpp
@@ -17,10 +17,10 @@
 
 #include "core/data_type_serde/data_type_time_serde.h"
 
-#include "core/data_type_serde/decoded_column_view.h"
 #include "core/data_type/data_type_decimal.h"
 #include "core/data_type/data_type_number.h"
 #include "core/data_type/primitive_type.h"
+#include "core/data_type_serde/decoded_column_view.h"
 #include "core/value/time_value.h"
 #include "exprs/function/cast/cast_base.h"
 #include "exprs/function/cast/cast_to_time_impl.hpp"
@@ -44,12 +44,11 @@ TimeValue::TimeType read_time_decoded_value(const 
DecodedColumnView& view, int64
     }
     const bool negative = micros < 0;
     const int64_t abs_micros = std::abs(micros);
-    return TimeValue::make_time(abs_micros / TimeValue::ONE_HOUR_MICROSECONDS,
-                                (abs_micros % 
TimeValue::ONE_HOUR_MICROSECONDS) /
-                                        TimeValue::ONE_MINUTE_MICROSECONDS,
-                                (abs_micros % 
TimeValue::ONE_MINUTE_MICROSECONDS) /
-                                        TimeValue::ONE_SECOND_MICROSECONDS,
-                                abs_micros % 
TimeValue::ONE_SECOND_MICROSECONDS, negative);
+    return TimeValue::make_time(
+            abs_micros / TimeValue::ONE_HOUR_MICROSECONDS,
+            (abs_micros % TimeValue::ONE_HOUR_MICROSECONDS) / 
TimeValue::ONE_MINUTE_MICROSECONDS,
+            (abs_micros % TimeValue::ONE_MINUTE_MICROSECONDS) / 
TimeValue::ONE_SECOND_MICROSECONDS,
+            abs_micros % TimeValue::ONE_SECOND_MICROSECONDS, negative);
 }
 
 } // namespace
@@ -173,10 +172,9 @@ Status 
DataTypeTimeV2SerDe::from_string_strict_mode(StringRef& str, IColumn& col
     return Status::OK();
 }
 
-Status DataTypeTimeV2SerDe::read_column_from_decoded_values(
-        IColumn& column, const DecodedColumnView& view) const {
-    if (view.value_kind != DecodedValueKind::INT32 &&
-        view.value_kind != DecodedValueKind::INT64) {
+Status DataTypeTimeV2SerDe::read_column_from_decoded_values(IColumn& column,
+                                                            const 
DecodedColumnView& view) const {
+    if (view.value_kind != DecodedValueKind::INT32 && view.value_kind != 
DecodedValueKind::INT64) {
         return Status::NotSupported("TIMEV2 decoded reader expects INT32 or 
INT64 source");
     }
     if (view.values == nullptr && view.row_count > 0) {
diff --git a/be/src/format/new_parquet/parquet_reader.cpp 
b/be/src/format/new_parquet/parquet_reader.cpp
index 6d0ef3eb742..70902d936ee 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -328,9 +328,9 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t 
batch_rows, Block* file_
         }
         IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
         bool can_filter_all = false;
-        RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
-                file_block, filter.data(), static_cast<size_t>(batch_rows), 
false,
-                &can_filter_all));
+        RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(file_block, 
filter.data(),
+                                                                   
static_cast<size_t>(batch_rows),
+                                                                   false, 
&can_filter_all));
         *selected_rows =
                 can_filter_all ? 0 : _apply_filter_to_selection(filter, 
selection, *selected_rows);
     }
diff --git a/be/src/format/reader/column_mapper.cpp 
b/be/src/format/reader/column_mapper.cpp
index 80a81f6c76d..e8e7442a8d7 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -249,18 +249,20 @@ Status TableColumnMapper::create_mapping(const 
std::vector<TableColumn>& project
         ColumnMapping mapping;
         mapping.table_column_id = table_column.id;
         mapping.table_type = table_column.type;
-        if (const auto* file_field = _find_file_field(table_column, 
file_schema)) {
-            RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field, 
&mapping));
-        } else if (table_column.is_partition_key && 
partition_values.count(table_column.name) > 0) {
-            // 3. Partition column, use partition value as a constant mapping. 
Note that partition column may also have default expression, but partition 
value should take precedence if it exists.
+        if (table_column.is_partition_key && 
partition_values.count(table_column.name) > 0) {
+            // 1. Partition column, use partition value as a constant mapping. 
Note that partition column may also have default expression, but partition 
value should take precedence if it exists.
+            mapping.is_constant = true;
             mapping.default_expr = 
VExprContext::create_shared(TableLiteral::create_shared(
                     mapping.table_type, 
partition_values.at(table_column.name)));
+        } else if (const auto* file_field = _find_file_field(table_column, 
file_schema)) {
+            // 2. Table column has a matching file column, use it as a direct 
mapping.
+            RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field, 
&mapping));
         } else if (table_column.default_expr != nullptr) {
-            // 4. Table column does not exist in file (column adding by schema 
evolution), which has a default expression, use it as a constant mapping.
+            // 3. Table column does not exist in file (column adding by schema 
evolution), which has a default expression, use it as a constant mapping.
             mapping.is_constant = true;
             mapping.default_expr = table_column.default_expr;
         } else if (table_column.name == ROW_LINEAGE_ROW_ID) {
-            // 5. Virtual column, use special mapping to indicate it should be 
materialized by table reader instead of read from file or evaluated from 
expression.
+            // 4. Virtual column, use special mapping to indicate it should be 
materialized by table reader instead of read from file or evaluated from 
expression.
             mapping.virtual_column_type = TableVirtualColumnType::ROW_ID;
         } else if (table_column.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
             mapping.virtual_column_type = 
TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER;
diff --git a/be/src/format/reader/column_mapper.h 
b/be/src/format/reader/column_mapper.h
index bcfe7152208..75b53f68d2d 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -43,8 +43,7 @@ struct SchemaField;
 struct FileScanRequest;
 struct FieldProjection;
 
-using TableColumnPredicates =
-        std::map<int32_t, std::vector<std::shared_ptr<ColumnPredicate>>>;
+using TableColumnPredicates = std::map<int32_t, 
std::vector<std::shared_ptr<ColumnPredicate>>>;
 
 enum class TableColumnMappingMode {
     BY_FIELD_ID,
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
index 2cf5eb30468..ee252817d40 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -295,9 +295,20 @@ protected:
             return Status::OK();
         }
         if (mapping.default_expr != nullptr) {
-            int res_id;
-            RETURN_IF_ERROR(mapping.default_expr->execute(current_block, 
&res_id));
-            *column = current_block->get_columns()[res_id];
+            if (current_block->rows() == current_rows) {
+                int res_id;
+                RETURN_IF_ERROR(mapping.default_expr->execute(current_block, 
&res_id));
+                *column = current_block->get_columns()[res_id];
+            } else {
+                DORIS_CHECK(mapping.is_constant);
+                Block eval_block;
+                eval_block.insert(
+                        
{mapping.table_type->create_column_const_with_default_value(current_rows),
+                         mapping.table_type, "__table_reader_const_rows"});
+                int res_id;
+                RETURN_IF_ERROR(mapping.default_expr->execute(&eval_block, 
&res_id));
+                *column = eval_block.get_columns()[res_id];
+            }
             return Status::OK();
         }
         *column = 
mapping.table_type->create_column_const_with_default_value(current_rows);
diff --git 
a/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp 
b/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp
index 10f15bb28b1..1622775b6a8 100644
--- a/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp
+++ b/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp
@@ -237,7 +237,8 @@ TEST(DataTypeSerDeDecodedValuesTest, 
ReadNullableInt32Values) {
     ASSERT_TRUE(st.ok()) << st;
 
     const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
-    const auto& nested_column = assert_cast<const 
ColumnInt32&>(nullable_column.get_nested_column());
+    const auto& nested_column =
+            assert_cast<const 
ColumnInt32&>(nullable_column.get_nested_column());
     ASSERT_EQ(nullable_column.size(), 4);
     EXPECT_FALSE(nullable_column.is_null_at(0));
     EXPECT_TRUE(nullable_column.is_null_at(1));
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp 
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 43ec9cc0ab1..f393da6822c 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -196,8 +196,8 @@ void write_int_pair_parquet_file(const std::string& 
file_path, int64_t row_group
     builder.version(::parquet::ParquetVersion::PARQUET_2_6);
     builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
     builder.compression(::parquet::Compression::UNCOMPRESSED);
-    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
-            *table, arrow::default_memory_pool(), out, row_group_size, 
builder.build()));
+    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, 
arrow::default_memory_pool(), out,
+                                                      row_group_size, 
builder.build()));
 }
 
 Block build_file_block(const std::vector<reader::SchemaField>& schema) {
diff --git a/be/test/format/reader/expr/cast_test.cpp 
b/be/test/format/reader/expr/cast_test.cpp
index cab4e6c5b0d..a236d327a1f 100644
--- a/be/test/format/reader/expr/cast_test.cpp
+++ b/be/test/format/reader/expr/cast_test.cpp
@@ -72,7 +72,8 @@ public:
     Status execute_column_impl(VExprContext* context, const Block* block, 
const Selector* selector,
                                size_t count, ColumnPtr& result_column) const 
override {
         ColumnPtr child_column;
-        RETURN_IF_ERROR(get_child(0)->execute_column(context, block, selector, 
count, child_column));
+        RETURN_IF_ERROR(
+                get_child(0)->execute_column(context, block, selector, count, 
child_column));
         const auto& input = assert_cast<const ColumnInt64&>(*child_column);
         auto result = ColumnUInt8::create();
         auto& result_data = result->get_data();
@@ -261,8 +262,8 @@ TEST_F(CastTest, 
ColumnMapperBuildsCastFilterForTypeMismatch) {
     table_filter.slot_ids = {7};
 
     reader::FileScanRequest file_request;
-    ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, 
projected_columns, &file_request)
-                        .ok());
+    ASSERT_TRUE(
+            mapper.create_scan_request({table_filter}, {}, projected_columns, 
&file_request).ok());
     ASSERT_EQ(file_request.expression_filters.size(), 1);
     ASSERT_EQ(file_request.predicate_columns, 
std::vector<reader::ColumnId>({0}));
     const auto& localized_expr = 
file_request.expression_filters[0].conjunct->root();
@@ -285,8 +286,7 @@ TEST_F(CastTest, 
ColumnMapperBuildsCastFilterForTypeMismatch) {
     ASSERT_TRUE(status.ok()) << status;
     IColumn::Filter filter(block.rows(), 1);
     bool can_filter_all = false;
-    status = conjunct->execute_filter(&block, filter.data(), block.rows(), 
false,
-                                      &can_filter_all);
+    status = conjunct->execute_filter(&block, filter.data(), block.rows(), 
false, &can_filter_all);
     ASSERT_TRUE(status.ok()) << status;
     EXPECT_FALSE(can_filter_all);
     ASSERT_EQ(filter.size(), 2);
diff --git a/be/test/format/reader/table_reader_test.cpp 
b/be/test/format/reader/table_reader_test.cpp
index 3d132244122..f770fddb723 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -56,9 +56,8 @@ public:
     Status execute_column_impl(VExprContext* context, const Block* block, 
const Selector* selector,
                                size_t count, ColumnPtr& result_column) const 
override {
         const auto* slot_ref = assert_cast<const 
VSlotRef*>(get_child(0).get());
-        const auto& input =
-                assert_cast<const ColumnInt32&>(
-                        *block->get_by_position(slot_ref->column_id()).column);
+        const auto& input = assert_cast<const ColumnInt32&>(
+                *block->get_by_position(slot_ref->column_id()).column);
         auto result = ColumnUInt8::create();
         auto& result_data = result->get_data();
         result_data.resize(count);
@@ -184,8 +183,7 @@ void write_parquet_file(const std::string& file_path, 
int32_t id, const std::str
             arrow::field("id", arrow::int32(), false),
             arrow::field("value", arrow::utf8(), false),
     });
-    auto table =
-            arrow::Table::Make(schema, {build_int32_array({id}), 
build_string_array({value})});
+    auto table = arrow::Table::Make(schema, {build_int32_array({id}), 
build_string_array({value})});
 
     auto file_result = arrow::io::FileOutputStream::Open(file_path);
     ASSERT_TRUE(file_result.ok()) << file_result.status();
@@ -195,8 +193,8 @@ void write_parquet_file(const std::string& file_path, 
int32_t id, const std::str
     builder.version(::parquet::ParquetVersion::PARQUET_2_6);
     builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
     builder.compression(::parquet::Compression::UNCOMPRESSED);
-    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
-            *table, arrow::default_memory_pool(), out, 1, builder.build()));
+    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, 
arrow::default_memory_pool(), out, 1,
+                                                      builder.build()));
 }
 
 void write_int_pair_parquet_file(const std::string& file_path, const 
std::vector<int32_t>& ids,
@@ -221,8 +219,8 @@ void write_int_pair_parquet_file(const std::string& 
file_path, const std::vector
     builder.compression(::parquet::Compression::UNCOMPRESSED);
     const auto write_row_group_size =
             row_group_size > 0 ? row_group_size : 
static_cast<int64_t>(ids.size());
-    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
-            *table, arrow::default_memory_pool(), out, write_row_group_size, 
builder.build()));
+    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, 
arrow::default_memory_pool(), out,
+                                                      write_row_group_size, 
builder.build()));
 }
 
 Block build_table_block(const std::vector<TableColumn>& columns) {
@@ -269,20 +267,19 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
-    ASSERT_TRUE(reader
-                        .init({
-                                .projected_columns = projected_columns,
-                                .column_predicates = {},
-                                .conjuncts = VExprContext(
-                                        
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0)),
-                                .format = FileFormat::PARQUET,
-                                .scan_params = nullptr,
-                                .io_ctx = nullptr,
-                                .runtime_state = &state,
-                                .scanner_profile = nullptr,
-                                .allow_missing_columns = true,
-                                .profile = nullptr,
-                        })
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(
+                                            
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0)),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
                         .ok());
 
     // Simulate the scanner lifecycle for three different splits:
@@ -335,20 +332,19 @@ TEST(TableReaderTest, 
OpenReaderBuildsTableFiltersFromConjuncts) {
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
-    ASSERT_TRUE(reader
-                        .init({
-                                .projected_columns = projected_columns,
-                                .column_predicates = {},
-                                .conjuncts = VExprContext(
-                                        
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2)),
-                                .format = FileFormat::PARQUET,
-                                .scan_params = nullptr,
-                                .io_ctx = nullptr,
-                                .runtime_state = &state,
-                                .scanner_profile = nullptr,
-                                .allow_missing_columns = true,
-                                .profile = nullptr,
-                        })
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(
+                                            
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2)),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
                         .ok());
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -417,19 +413,18 @@ TEST(TableReaderTest, 
OpenReaderBuildsColumnPredicateFilters) {
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
-    ASSERT_TRUE(reader
-                        .init({
-                                .projected_columns = projected_columns,
-                                .column_predicates = 
std::move(column_predicates),
-                                .conjuncts = VExprContext(nullptr),
-                                .format = FileFormat::PARQUET,
-                                .scan_params = nullptr,
-                                .io_ctx = nullptr,
-                                .runtime_state = &state,
-                                .scanner_profile = nullptr,
-                                .allow_missing_columns = true,
-                                .profile = nullptr,
-                        })
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = 
std::move(column_predicates),
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
                         .ok());
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -509,8 +504,8 @@ TEST(TableReaderTest, 
CreateScanRequestDeduplicatesSharedPredicateColumns) {
     });
 
     FileScanRequest file_request;
-    ASSERT_TRUE(mapper.create_scan_request(table_filters, {}, 
projected_columns, &file_request)
-                        .ok());
+    ASSERT_TRUE(
+            mapper.create_scan_request(table_filters, {}, projected_columns, 
&file_request).ok());
 
     // Both filters reference column a. It must still be read once as a 
predicate column, and a
     // predicate column must not be repeated as a non-predicate column.
@@ -544,21 +539,20 @@ TEST(TableReaderTest, 
OpenReaderPushesMultiColumnConjunctToParquetReader) {
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
-    ASSERT_TRUE(reader
-                        .init({
-                                .projected_columns = projected_columns,
-                                .column_predicates = {},
-                                .conjuncts = VExprContext(
-                                        
std::make_shared<TableInt32SumGreaterThanExpr>(
-                                                0, 0, 1, 1, 8)),
-                                .format = FileFormat::PARQUET,
-                                .scan_params = nullptr,
-                                .io_ctx = nullptr,
-                                .runtime_state = &state,
-                                .scanner_profile = nullptr,
-                                .allow_missing_columns = true,
-                                .profile = nullptr,
-                        })
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(
+                                            
std::make_shared<TableInt32SumGreaterThanExpr>(0, 0, 1,
+                                                                               
            1, 8)),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
                         .ok());
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -600,19 +594,18 @@ TEST(TableReaderTest, 
ProjectedColumnsFillDefaultForParquetSchemaMismatch) {
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
-    ASSERT_TRUE(reader
-                        .init({
-                                .projected_columns = projected_columns,
-                                .column_predicates = {},
-                                .conjuncts = VExprContext(nullptr),
-                                .format = FileFormat::PARQUET,
-                                .scan_params = nullptr,
-                                .io_ctx = nullptr,
-                                .runtime_state = &state,
-                                .scanner_profile = nullptr,
-                                .allow_missing_columns = true,
-                                .profile = nullptr,
-                        })
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
                         .ok());
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -645,19 +638,18 @@ TEST(TableReaderTest, 
ProjectedColumnsRejectParquetSchemaMismatchWhenMissingColu
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
-    ASSERT_TRUE(reader
-                        .init({
-                                .projected_columns = projected_columns,
-                                .column_predicates = {},
-                                .conjuncts = VExprContext(nullptr),
-                                .format = FileFormat::PARQUET,
-                                .scan_params = nullptr,
-                                .io_ctx = nullptr,
-                                .runtime_state = &state,
-                                .scanner_profile = nullptr,
-                                .allow_missing_columns = false,
-                                .profile = nullptr,
-                        })
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = false,
+                                    .profile = nullptr,
+                            })
                         .ok());
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -674,6 +666,56 @@ TEST(TableReaderTest, 
ProjectedColumnsRejectParquetSchemaMismatchWhenMissingColu
     std::filesystem::remove_all(test_dir);
 }
 
+TEST(TableReaderTest, ProjectedPartitionColumnUsesSplitPartitionValue) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_table_reader_partition_value_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_parquet_file(file_path, 1, "one");
+
+    std::vector<TableColumn> projected_columns;
+    auto partition_column = make_table_column(1, "value", 
std::make_shared<DataTypeString>());
+    partition_column.is_partition_key = true;
+    projected_columns.push_back(std::move(partition_column));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+
+    auto split_options = build_split_options(file_path);
+    split_options.partition_values.emplace("value", 
Field::create_field<TYPE_STRING>("p1"));
+    ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+    // The file has a physical column with the same id/name. The split 
partition value should still
+    // take precedence and be materialized by TableReader.
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+
+    const auto& partition_value =
+            assert_cast<const ColumnString&>(*block.get_by_position(0).column);
+    ASSERT_EQ(partition_value.size(), 1);
+    EXPECT_EQ(partition_value.get_data_at(0).to_string(), "p1");
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
 TEST(TableReaderTest, 
ProjectedColumnsUseMapperExpressionForSameNameDifferentIdParquetSchema) {
     const auto test_dir =
             std::filesystem::temp_directory_path() / 
"doris_table_reader_same_name_diff_id_test";
@@ -688,19 +730,18 @@ TEST(TableReaderTest, 
ProjectedColumnsUseMapperExpressionForSameNameDifferentIdP
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
-    ASSERT_TRUE(reader
-                        .init({
-                                .projected_columns = projected_columns,
-                                .column_predicates = {},
-                                .conjuncts = VExprContext(nullptr),
-                                .format = FileFormat::PARQUET,
-                                .scan_params = nullptr,
-                                .io_ctx = nullptr,
-                                .runtime_state = &state,
-                                .scanner_profile = nullptr,
-                                .allow_missing_columns = true,
-                                .profile = nullptr,
-                        })
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
                         .ok());
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -738,19 +779,18 @@ TEST(TableReaderTest, 
ProjectedColumnsUseMapperExpressionsForParquetSchemaMismat
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
-    ASSERT_TRUE(reader
-                        .init({
-                                .projected_columns = projected_columns,
-                                .column_predicates = {},
-                                .conjuncts = VExprContext(nullptr),
-                                .format = FileFormat::PARQUET,
-                                .scan_params = nullptr,
-                                .io_ctx = nullptr,
-                                .runtime_state = &state,
-                                .scanner_profile = nullptr,
-                                .allow_missing_columns = true,
-                                .profile = nullptr,
-                        })
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
                         .ok());
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to