This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new ac24a827685 [fix](be) Fill partition columns in TableReader (#63773)
ac24a827685 is described below
commit ac24a827685562d3adb877ccd2082ceb719e642e
Author: Gabriel <[email protected]>
AuthorDate: Thu May 28 11:11:00 2026 +0800
[fix](be) Fill partition columns in TableReader (#63773)
Problem Summary: TableReader could map partition columns to physical
file columns before checking split partition values, and
constant/default expression materialization used the file-local block
row count. For scans where partition values should be filled from split
metadata, especially when the file-local block row count differs from
the batch row count, this could produce incorrect materialized columns.
---
.../data_type_serde/data_type_datetimev2_serde.cpp | 3 +-
.../data_type_serde/data_type_datev2_serde.cpp | 7 +-
.../data_type_serde/data_type_decimal_serde.cpp | 14 +-
.../data_type_serde/data_type_nullable_serde.cpp | 6 +-
.../data_type_serde/data_type_number_serde.cpp | 10 +-
.../data_type_serde/data_type_string_serde.cpp | 2 +-
.../core/data_type_serde/data_type_time_serde.cpp | 20 +-
be/src/format/new_parquet/parquet_reader.cpp | 6 +-
be/src/format/reader/column_mapper.cpp | 14 +-
be/src/format/reader/column_mapper.h | 3 +-
be/src/format/reader/table_reader.h | 17 +-
.../data_type_serde_decoded_values_test.cpp | 3 +-
be/test/format/new_parquet/parquet_reader_test.cpp | 4 +-
be/test/format/reader/expr/cast_test.cpp | 10 +-
be/test/format/reader/table_reader_test.cpp | 278 ++++++++++++---------
15 files changed, 225 insertions(+), 172 deletions(-)
diff --git a/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
b/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
index fc2c14d1829..ce0599080c6 100644
--- a/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
@@ -22,12 +22,13 @@
#include <chrono> // IWYU pragma: keep
#include <cstdint>
+
#include "common/status.h"
#include "core/column/column_const.h"
-#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/primitive_type.h"
+#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "core/value/vdatetime_value.h"
#include "exprs/function/cast/cast_to_datetimev2_impl.hpp"
diff --git a/be/src/core/data_type_serde/data_type_datev2_serde.cpp
b/be/src/core/data_type_serde/data_type_datev2_serde.cpp
index 9410df86eaa..94b86312d61 100644
--- a/be/src/core/data_type_serde/data_type_datev2_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_datev2_serde.cpp
@@ -22,11 +22,12 @@
#include <fmt/core.h>
#include <cstdint>
+
#include "core/column/column_const.h"
-#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/define_primitive_type.h"
+#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "core/value/vdatetime_value.h"
#include "exprs/function/cast/cast_to_datev2_impl.hpp"
@@ -125,8 +126,8 @@ Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn&
column, const arrow:
return Status::OK();
}
-Status DataTypeDateV2SerDe::read_column_from_decoded_values(
- IColumn& column, const DecodedColumnView& view) const {
+Status DataTypeDateV2SerDe::read_column_from_decoded_values(IColumn& column,
+ const
DecodedColumnView& view) const {
if (view.value_kind != DecodedValueKind::INT32) {
return Status::NotSupported("DATEV2 decoded reader expects INT32
source");
}
diff --git a/be/src/core/data_type_serde/data_type_decimal_serde.cpp
b/be/src/core/data_type_serde/data_type_decimal_serde.cpp
index c744cdb8a2f..9fa2e0c6ebd 100644
--- a/be/src/core/data_type_serde/data_type_decimal_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_decimal_serde.cpp
@@ -61,8 +61,8 @@ NativeType decode_big_endian_signed_integer(const uint8_t*
data, int length) {
}
template <PrimitiveType T>
-typename PrimitiveTypeTraits<T>::CppType read_decimal_decoded_value(
- const DecodedColumnView& view, int64_t row) {
+typename PrimitiveTypeTraits<T>::CppType read_decimal_decoded_value(const
DecodedColumnView& view,
+ int64_t
row) {
using FieldType = typename PrimitiveTypeTraits<T>::CppType;
if (view.value_kind == DecodedValueKind::INT32) {
const auto* values = reinterpret_cast<const int32_t*>(view.values);
@@ -76,9 +76,9 @@ typename PrimitiveTypeTraits<T>::CppType
read_decimal_decoded_value(
const auto length = view.value_kind == DecodedValueKind::FIXED_BINARY
? view.fixed_length
: cast_set<int, size_t, false>(value.size);
- return FieldType {static_cast<typename FieldType::NativeType>(
- decode_big_endian_signed_integer<Int128>(reinterpret_cast<const
uint8_t*>(value.data),
- length))};
+ return FieldType {
+ static_cast<typename
FieldType::NativeType>(decode_big_endian_signed_integer<Int128>(
+ reinterpret_cast<const uint8_t*>(value.data), length))};
}
template <PrimitiveType T>
@@ -441,8 +441,8 @@ Status
DataTypeDecimalSerDe<T>::read_column_from_decoded_values(
return read_decimal_decoded_values<T>(column, view);
}
}
- return Status::NotSupported("Unsupported decoded values for {} from source
kind {}",
- get_name(), static_cast<int>(view.value_kind));
+ return Status::NotSupported("Unsupported decoded values for {} from source
kind {}", get_name(),
+ static_cast<int>(view.value_kind));
}
template <PrimitiveType T>
diff --git a/be/src/core/data_type_serde/data_type_nullable_serde.cpp
b/be/src/core/data_type_serde/data_type_nullable_serde.cpp
index 6b15b29c63a..b02c8606332 100644
--- a/be/src/core/data_type_serde/data_type_nullable_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_nullable_serde.cpp
@@ -29,9 +29,9 @@
#include "core/column/column_const.h"
#include "core/column/column_nullable.h"
#include "core/column/column_vector.h"
-#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type_serde/data_type_serde.h"
#include "core/data_type_serde/data_type_string_serde.h"
+#include "core/data_type_serde/decoded_column_view.h"
#include "exprs/function/cast/cast_base.h"
#include "format/transformer/vcsv_transformer.h"
#include "util/jsonb_document.h"
@@ -351,8 +351,8 @@ Status
DataTypeNullableSerDe::read_column_from_arrow(IColumn& column,
ctz);
}
-Status DataTypeNullableSerDe::read_column_from_decoded_values(
- IColumn& column, const DecodedColumnView& view) const {
+Status DataTypeNullableSerDe::read_column_from_decoded_values(IColumn& column,
+ const
DecodedColumnView& view) const {
auto& nullable_column = assert_cast<ColumnNullable&>(column);
auto& null_map = nullable_column.get_null_map_data();
const auto old_size = null_map.size();
diff --git a/be/src/core/data_type_serde/data_type_number_serde.cpp
b/be/src/core/data_type_serde/data_type_number_serde.cpp
index 131e6d05941..6cd30449083 100644
--- a/be/src/core/data_type_serde/data_type_number_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_number_serde.cpp
@@ -26,8 +26,8 @@
#include "core/column/column_nullable.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type/primitive_type.h"
-#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type_serde/data_type_serde.h"
+#include "core/data_type_serde/decoded_column_view.h"
#include "core/packed_int128.h"
#include "core/types.h"
#include "core/value/timestamptz_value.h"
@@ -55,8 +55,8 @@ Status read_number_decoded_values(IColumn& column, const
DecodedColumnView& view
if (view.values == nullptr && view.row_count > 0) {
return Status::Corruption("Decoded value buffer is null for {}",
column.get_name());
}
- auto& data = assert_cast<typename
PrimitiveTypeTraits<DorisType>::ColumnType&>(column)
- .get_data();
+ auto& data =
+ assert_cast<typename
PrimitiveTypeTraits<DorisType>::ColumnType&>(column).get_data();
const auto* values = decoded_values_as<SourceType>(view);
for (int64_t row = 0; row < view.row_count; ++row) {
using DorisCppType = typename PrimitiveTypeTraits<DorisType>::CppType;
@@ -204,8 +204,8 @@ Status
DataTypeNumberSerDe<T>::read_column_from_decoded_values(
return read_number_decoded_values<TYPE_DOUBLE, double>(column,
view);
}
}
- return Status::NotSupported("Unsupported decoded values for {} from source
kind {}",
- get_name(), static_cast<int>(view.value_kind));
+ return Status::NotSupported("Unsupported decoded values for {} from source
kind {}", get_name(),
+ static_cast<int>(view.value_kind));
}
template <PrimitiveType T>
diff --git a/be/src/core/data_type_serde/data_type_string_serde.cpp
b/be/src/core/data_type_serde/data_type_string_serde.cpp
index 478cdf3b5e6..0a9a5cd7dab 100644
--- a/be/src/core/data_type_serde/data_type_string_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_string_serde.cpp
@@ -18,8 +18,8 @@
#include "core/data_type_serde/data_type_string_serde.h"
#include "core/column/column_string.h"
-#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type/define_primitive_type.h"
+#include "core/data_type_serde/decoded_column_view.h"
#include "util/jsonb_document_cast.h"
#include "util/jsonb_utils.h"
#include "util/jsonb_writer.h"
diff --git a/be/src/core/data_type_serde/data_type_time_serde.cpp
b/be/src/core/data_type_serde/data_type_time_serde.cpp
index 65e1afa577d..a40a8d217c9 100644
--- a/be/src/core/data_type_serde/data_type_time_serde.cpp
+++ b/be/src/core/data_type_serde/data_type_time_serde.cpp
@@ -17,10 +17,10 @@
#include "core/data_type_serde/data_type_time_serde.h"
-#include "core/data_type_serde/decoded_column_view.h"
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/primitive_type.h"
+#include "core/data_type_serde/decoded_column_view.h"
#include "core/value/time_value.h"
#include "exprs/function/cast/cast_base.h"
#include "exprs/function/cast/cast_to_time_impl.hpp"
@@ -44,12 +44,11 @@ TimeValue::TimeType read_time_decoded_value(const
DecodedColumnView& view, int64
}
const bool negative = micros < 0;
const int64_t abs_micros = std::abs(micros);
- return TimeValue::make_time(abs_micros / TimeValue::ONE_HOUR_MICROSECONDS,
- (abs_micros %
TimeValue::ONE_HOUR_MICROSECONDS) /
- TimeValue::ONE_MINUTE_MICROSECONDS,
- (abs_micros %
TimeValue::ONE_MINUTE_MICROSECONDS) /
- TimeValue::ONE_SECOND_MICROSECONDS,
- abs_micros %
TimeValue::ONE_SECOND_MICROSECONDS, negative);
+ return TimeValue::make_time(
+ abs_micros / TimeValue::ONE_HOUR_MICROSECONDS,
+ (abs_micros % TimeValue::ONE_HOUR_MICROSECONDS) /
TimeValue::ONE_MINUTE_MICROSECONDS,
+ (abs_micros % TimeValue::ONE_MINUTE_MICROSECONDS) /
TimeValue::ONE_SECOND_MICROSECONDS,
+ abs_micros % TimeValue::ONE_SECOND_MICROSECONDS, negative);
}
} // namespace
@@ -173,10 +172,9 @@ Status
DataTypeTimeV2SerDe::from_string_strict_mode(StringRef& str, IColumn& col
return Status::OK();
}
-Status DataTypeTimeV2SerDe::read_column_from_decoded_values(
- IColumn& column, const DecodedColumnView& view) const {
- if (view.value_kind != DecodedValueKind::INT32 &&
- view.value_kind != DecodedValueKind::INT64) {
+Status DataTypeTimeV2SerDe::read_column_from_decoded_values(IColumn& column,
+ const
DecodedColumnView& view) const {
+ if (view.value_kind != DecodedValueKind::INT32 && view.value_kind !=
DecodedValueKind::INT64) {
return Status::NotSupported("TIMEV2 decoded reader expects INT32 or
INT64 source");
}
if (view.values == nullptr && view.row_count > 0) {
diff --git a/be/src/format/new_parquet/parquet_reader.cpp
b/be/src/format/new_parquet/parquet_reader.cpp
index 6d0ef3eb742..70902d936ee 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -328,9 +328,9 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t
batch_rows, Block* file_
}
IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
bool can_filter_all = false;
- RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
- file_block, filter.data(), static_cast<size_t>(batch_rows),
false,
- &can_filter_all));
+ RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(file_block,
filter.data(),
+
static_cast<size_t>(batch_rows),
+ false,
&can_filter_all));
*selected_rows =
can_filter_all ? 0 : _apply_filter_to_selection(filter,
selection, *selected_rows);
}
diff --git a/be/src/format/reader/column_mapper.cpp
b/be/src/format/reader/column_mapper.cpp
index 80a81f6c76d..e8e7442a8d7 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -249,18 +249,20 @@ Status TableColumnMapper::create_mapping(const
std::vector<TableColumn>& project
ColumnMapping mapping;
mapping.table_column_id = table_column.id;
mapping.table_type = table_column.type;
- if (const auto* file_field = _find_file_field(table_column,
file_schema)) {
- RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field,
&mapping));
- } else if (table_column.is_partition_key &&
partition_values.count(table_column.name) > 0) {
- // 3. Partition column, use partition value as a constant mapping.
Note that partition column may also have default expression, but partition
value should take precedence if it exists.
+ if (table_column.is_partition_key &&
partition_values.count(table_column.name) > 0) {
+ // 1. Partition column, use partition value as a constant mapping.
Note that partition column may also have default expression, but partition
value should take precedence if it exists.
+ mapping.is_constant = true;
mapping.default_expr =
VExprContext::create_shared(TableLiteral::create_shared(
mapping.table_type,
partition_values.at(table_column.name)));
+ } else if (const auto* file_field = _find_file_field(table_column,
file_schema)) {
+ // 2. Table column has a matching file column, use it as a direct
mapping.
+ RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field,
&mapping));
} else if (table_column.default_expr != nullptr) {
- // 4. Table column does not exist in file (column adding by schema
evolution), which has a default expression, use it as a constant mapping.
+ // 3. Table column does not exist in file (column adding by schema
evolution), which has a default expression, use it as a constant mapping.
mapping.is_constant = true;
mapping.default_expr = table_column.default_expr;
} else if (table_column.name == ROW_LINEAGE_ROW_ID) {
- // 5. Virtual column, use special mapping to indicate it should be
materialized by table reader instead of read from file or evaluated from
expression.
+ // 4. Virtual column, use special mapping to indicate it should be
materialized by table reader instead of read from file or evaluated from
expression.
mapping.virtual_column_type = TableVirtualColumnType::ROW_ID;
} else if (table_column.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
mapping.virtual_column_type =
TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER;
diff --git a/be/src/format/reader/column_mapper.h
b/be/src/format/reader/column_mapper.h
index bcfe7152208..75b53f68d2d 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -43,8 +43,7 @@ struct SchemaField;
struct FileScanRequest;
struct FieldProjection;
-using TableColumnPredicates =
- std::map<int32_t, std::vector<std::shared_ptr<ColumnPredicate>>>;
+using TableColumnPredicates = std::map<int32_t,
std::vector<std::shared_ptr<ColumnPredicate>>>;
enum class TableColumnMappingMode {
BY_FIELD_ID,
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index 2cf5eb30468..ee252817d40 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -295,9 +295,20 @@ protected:
return Status::OK();
}
if (mapping.default_expr != nullptr) {
- int res_id;
- RETURN_IF_ERROR(mapping.default_expr->execute(current_block,
&res_id));
- *column = current_block->get_columns()[res_id];
+ if (current_block->rows() == current_rows) {
+ int res_id;
+ RETURN_IF_ERROR(mapping.default_expr->execute(current_block,
&res_id));
+ *column = current_block->get_columns()[res_id];
+ } else {
+ DORIS_CHECK(mapping.is_constant);
+ Block eval_block;
+ eval_block.insert(
+
{mapping.table_type->create_column_const_with_default_value(current_rows),
+ mapping.table_type, "__table_reader_const_rows"});
+ int res_id;
+ RETURN_IF_ERROR(mapping.default_expr->execute(&eval_block,
&res_id));
+ *column = eval_block.get_columns()[res_id];
+ }
return Status::OK();
}
*column =
mapping.table_type->create_column_const_with_default_value(current_rows);
diff --git
a/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp
b/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp
index 10f15bb28b1..1622775b6a8 100644
--- a/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp
+++ b/be/test/core/data_type_serde/data_type_serde_decoded_values_test.cpp
@@ -237,7 +237,8 @@ TEST(DataTypeSerDeDecodedValuesTest,
ReadNullableInt32Values) {
ASSERT_TRUE(st.ok()) << st;
const auto& nullable_column = assert_cast<const ColumnNullable&>(*column);
- const auto& nested_column = assert_cast<const
ColumnInt32&>(nullable_column.get_nested_column());
+ const auto& nested_column =
+ assert_cast<const
ColumnInt32&>(nullable_column.get_nested_column());
ASSERT_EQ(nullable_column.size(), 4);
EXPECT_FALSE(nullable_column.is_null_at(0));
EXPECT_TRUE(nullable_column.is_null_at(1));
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 43ec9cc0ab1..f393da6822c 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -196,8 +196,8 @@ void write_int_pair_parquet_file(const std::string&
file_path, int64_t row_group
builder.version(::parquet::ParquetVersion::PARQUET_2_6);
builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
builder.compression(::parquet::Compression::UNCOMPRESSED);
- PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
- *table, arrow::default_memory_pool(), out, row_group_size,
builder.build()));
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table,
arrow::default_memory_pool(), out,
+ row_group_size,
builder.build()));
}
Block build_file_block(const std::vector<reader::SchemaField>& schema) {
diff --git a/be/test/format/reader/expr/cast_test.cpp
b/be/test/format/reader/expr/cast_test.cpp
index cab4e6c5b0d..a236d327a1f 100644
--- a/be/test/format/reader/expr/cast_test.cpp
+++ b/be/test/format/reader/expr/cast_test.cpp
@@ -72,7 +72,8 @@ public:
Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
size_t count, ColumnPtr& result_column) const
override {
ColumnPtr child_column;
- RETURN_IF_ERROR(get_child(0)->execute_column(context, block, selector,
count, child_column));
+ RETURN_IF_ERROR(
+ get_child(0)->execute_column(context, block, selector, count,
child_column));
const auto& input = assert_cast<const ColumnInt64&>(*child_column);
auto result = ColumnUInt8::create();
auto& result_data = result->get_data();
@@ -261,8 +262,8 @@ TEST_F(CastTest,
ColumnMapperBuildsCastFilterForTypeMismatch) {
table_filter.slot_ids = {7};
reader::FileScanRequest file_request;
- ASSERT_TRUE(mapper.create_scan_request({table_filter}, {},
projected_columns, &file_request)
- .ok());
+ ASSERT_TRUE(
+ mapper.create_scan_request({table_filter}, {}, projected_columns,
&file_request).ok());
ASSERT_EQ(file_request.expression_filters.size(), 1);
ASSERT_EQ(file_request.predicate_columns,
std::vector<reader::ColumnId>({0}));
const auto& localized_expr =
file_request.expression_filters[0].conjunct->root();
@@ -285,8 +286,7 @@ TEST_F(CastTest,
ColumnMapperBuildsCastFilterForTypeMismatch) {
ASSERT_TRUE(status.ok()) << status;
IColumn::Filter filter(block.rows(), 1);
bool can_filter_all = false;
- status = conjunct->execute_filter(&block, filter.data(), block.rows(),
false,
- &can_filter_all);
+ status = conjunct->execute_filter(&block, filter.data(), block.rows(),
false, &can_filter_all);
ASSERT_TRUE(status.ok()) << status;
EXPECT_FALSE(can_filter_all);
ASSERT_EQ(filter.size(), 2);
diff --git a/be/test/format/reader/table_reader_test.cpp
b/be/test/format/reader/table_reader_test.cpp
index 3d132244122..f770fddb723 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -56,9 +56,8 @@ public:
Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
size_t count, ColumnPtr& result_column) const
override {
const auto* slot_ref = assert_cast<const
VSlotRef*>(get_child(0).get());
- const auto& input =
- assert_cast<const ColumnInt32&>(
- *block->get_by_position(slot_ref->column_id()).column);
+ const auto& input = assert_cast<const ColumnInt32&>(
+ *block->get_by_position(slot_ref->column_id()).column);
auto result = ColumnUInt8::create();
auto& result_data = result->get_data();
result_data.resize(count);
@@ -184,8 +183,7 @@ void write_parquet_file(const std::string& file_path,
int32_t id, const std::str
arrow::field("id", arrow::int32(), false),
arrow::field("value", arrow::utf8(), false),
});
- auto table =
- arrow::Table::Make(schema, {build_int32_array({id}),
build_string_array({value})});
+ auto table = arrow::Table::Make(schema, {build_int32_array({id}),
build_string_array({value})});
auto file_result = arrow::io::FileOutputStream::Open(file_path);
ASSERT_TRUE(file_result.ok()) << file_result.status();
@@ -195,8 +193,8 @@ void write_parquet_file(const std::string& file_path,
int32_t id, const std::str
builder.version(::parquet::ParquetVersion::PARQUET_2_6);
builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
builder.compression(::parquet::Compression::UNCOMPRESSED);
- PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
- *table, arrow::default_memory_pool(), out, 1, builder.build()));
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table,
arrow::default_memory_pool(), out, 1,
+ builder.build()));
}
void write_int_pair_parquet_file(const std::string& file_path, const
std::vector<int32_t>& ids,
@@ -221,8 +219,8 @@ void write_int_pair_parquet_file(const std::string&
file_path, const std::vector
builder.compression(::parquet::Compression::UNCOMPRESSED);
const auto write_row_group_size =
row_group_size > 0 ? row_group_size :
static_cast<int64_t>(ids.size());
- PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
- *table, arrow::default_memory_pool(), out, write_row_group_size,
builder.build()));
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table,
arrow::default_memory_pool(), out,
+ write_row_group_size,
builder.build()));
}
Block build_table_block(const std::vector<TableColumn>& columns) {
@@ -269,20 +267,19 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
- ASSERT_TRUE(reader
- .init({
- .projected_columns = projected_columns,
- .column_predicates = {},
- .conjuncts = VExprContext(
-
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0)),
- .format = FileFormat::PARQUET,
- .scan_params = nullptr,
- .io_ctx = nullptr,
- .runtime_state = &state,
- .scanner_profile = nullptr,
- .allow_missing_columns = true,
- .profile = nullptr,
- })
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(
+
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0)),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
.ok());
// Simulate the scanner lifecycle for three different splits:
@@ -335,20 +332,19 @@ TEST(TableReaderTest,
OpenReaderBuildsTableFiltersFromConjuncts) {
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
- ASSERT_TRUE(reader
- .init({
- .projected_columns = projected_columns,
- .column_predicates = {},
- .conjuncts = VExprContext(
-
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2)),
- .format = FileFormat::PARQUET,
- .scan_params = nullptr,
- .io_ctx = nullptr,
- .runtime_state = &state,
- .scanner_profile = nullptr,
- .allow_missing_columns = true,
- .profile = nullptr,
- })
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(
+
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2)),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
.ok());
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -417,19 +413,18 @@ TEST(TableReaderTest,
OpenReaderBuildsColumnPredicateFilters) {
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
- ASSERT_TRUE(reader
- .init({
- .projected_columns = projected_columns,
- .column_predicates =
std::move(column_predicates),
- .conjuncts = VExprContext(nullptr),
- .format = FileFormat::PARQUET,
- .scan_params = nullptr,
- .io_ctx = nullptr,
- .runtime_state = &state,
- .scanner_profile = nullptr,
- .allow_missing_columns = true,
- .profile = nullptr,
- })
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates =
std::move(column_predicates),
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
.ok());
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -509,8 +504,8 @@ TEST(TableReaderTest,
CreateScanRequestDeduplicatesSharedPredicateColumns) {
});
FileScanRequest file_request;
- ASSERT_TRUE(mapper.create_scan_request(table_filters, {},
projected_columns, &file_request)
- .ok());
+ ASSERT_TRUE(
+ mapper.create_scan_request(table_filters, {}, projected_columns,
&file_request).ok());
// Both filters reference column a. It must still be read once as a
predicate column, and a
// predicate column must not be repeated as a non-predicate column.
@@ -544,21 +539,20 @@ TEST(TableReaderTest,
OpenReaderPushesMultiColumnConjunctToParquetReader) {
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
- ASSERT_TRUE(reader
- .init({
- .projected_columns = projected_columns,
- .column_predicates = {},
- .conjuncts = VExprContext(
-
std::make_shared<TableInt32SumGreaterThanExpr>(
- 0, 0, 1, 1, 8)),
- .format = FileFormat::PARQUET,
- .scan_params = nullptr,
- .io_ctx = nullptr,
- .runtime_state = &state,
- .scanner_profile = nullptr,
- .allow_missing_columns = true,
- .profile = nullptr,
- })
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(
+
std::make_shared<TableInt32SumGreaterThanExpr>(0, 0, 1,
+
1, 8)),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
.ok());
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -600,19 +594,18 @@ TEST(TableReaderTest,
ProjectedColumnsFillDefaultForParquetSchemaMismatch) {
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
- ASSERT_TRUE(reader
- .init({
- .projected_columns = projected_columns,
- .column_predicates = {},
- .conjuncts = VExprContext(nullptr),
- .format = FileFormat::PARQUET,
- .scan_params = nullptr,
- .io_ctx = nullptr,
- .runtime_state = &state,
- .scanner_profile = nullptr,
- .allow_missing_columns = true,
- .profile = nullptr,
- })
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
.ok());
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -645,19 +638,18 @@ TEST(TableReaderTest,
ProjectedColumnsRejectParquetSchemaMismatchWhenMissingColu
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
- ASSERT_TRUE(reader
- .init({
- .projected_columns = projected_columns,
- .column_predicates = {},
- .conjuncts = VExprContext(nullptr),
- .format = FileFormat::PARQUET,
- .scan_params = nullptr,
- .io_ctx = nullptr,
- .runtime_state = &state,
- .scanner_profile = nullptr,
- .allow_missing_columns = false,
- .profile = nullptr,
- })
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = false,
+ .profile = nullptr,
+ })
.ok());
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -674,6 +666,56 @@ TEST(TableReaderTest,
ProjectedColumnsRejectParquetSchemaMismatchWhenMissingColu
std::filesystem::remove_all(test_dir);
}
+TEST(TableReaderTest, ProjectedPartitionColumnUsesSplitPartitionValue) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_partition_value_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_parquet_file(file_path, 1, "one");
+
+ std::vector<TableColumn> projected_columns;
+ auto partition_column = make_table_column(1, "value",
std::make_shared<DataTypeString>());
+ partition_column.is_partition_key = true;
+ projected_columns.push_back(std::move(partition_column));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ split_options.partition_values.emplace("value",
Field::create_field<TYPE_STRING>("p1"));
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ // The file has a physical column with the same id/name. The split
partition value should still
+ // take precedence and be materialized by TableReader.
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ const auto& partition_value =
+ assert_cast<const ColumnString&>(*block.get_by_position(0).column);
+ ASSERT_EQ(partition_value.size(), 1);
+ EXPECT_EQ(partition_value.get_data_at(0).to_string(), "p1");
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
TEST(TableReaderTest,
ProjectedColumnsUseMapperExpressionForSameNameDifferentIdParquetSchema) {
const auto test_dir =
std::filesystem::temp_directory_path() /
"doris_table_reader_same_name_diff_id_test";
@@ -688,19 +730,18 @@ TEST(TableReaderTest,
ProjectedColumnsUseMapperExpressionForSameNameDifferentIdP
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
- ASSERT_TRUE(reader
- .init({
- .projected_columns = projected_columns,
- .column_predicates = {},
- .conjuncts = VExprContext(nullptr),
- .format = FileFormat::PARQUET,
- .scan_params = nullptr,
- .io_ctx = nullptr,
- .runtime_state = &state,
- .scanner_profile = nullptr,
- .allow_missing_columns = true,
- .profile = nullptr,
- })
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
.ok());
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
@@ -738,19 +779,18 @@ TEST(TableReaderTest,
ProjectedColumnsUseMapperExpressionsForParquetSchemaMismat
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
- ASSERT_TRUE(reader
- .init({
- .projected_columns = projected_columns,
- .column_predicates = {},
- .conjuncts = VExprContext(nullptr),
- .format = FileFormat::PARQUET,
- .scan_params = nullptr,
- .io_ctx = nullptr,
- .runtime_state = &state,
- .scanner_profile = nullptr,
- .allow_missing_columns = true,
- .profile = nullptr,
- })
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
.ok());
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]