This is an automated email from the ASF dual-hosted git repository.
suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new cf14d1e05a7 [fix](be) Read required nested parquet scalars
cf14d1e05a7 is described below
commit cf14d1e05a7e705dccb28e72f6fa9a5f6a93ff29
Author: Socrates <[email protected]>
AuthorDate: Fri May 29 10:31:47 2026 +0800
[fix](be) Read required nested parquet scalars
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary: Support required nested scalar leaves that Arrow
RecordReader reports without level buffers, and only consume materialized
values when nested definition levels reach the leaf max definition level.
### Release note
None
### Check List (For Author)
- Test: Manual test
- Ran git diff --check. Fedora BE unit test validation follows with
./run-be-ut.sh --run '--filter=ParquetColumnReaderTest.*'.
- Behavior changed: No
- Does this need documentation: No
---
be/src/format/new_parquet/column_reader.cpp | 41 ++++++++++++++++++++--
.../new_parquet/parquet_column_reader_test.cpp | 1 +
2 files changed, 39 insertions(+), 3 deletions(-)
diff --git a/be/src/format/new_parquet/column_reader.cpp
b/be/src/format/new_parquet/column_reader.cpp
index 37d1efa322e..a177e025879 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -458,6 +458,12 @@ Status read_nested_scalar_batch(ScalarColumnReader&
column_reader, int64_t batch
}
batch->levels_written = record_reader->levels_written();
batch->values_written = record_reader->values_written();
+ if (batch->levels_written == 0 && batch->records_read > 0 &&
+ batch->values_written == batch->records_read &&
+ column_reader.descriptor()->max_definition_level() == 0 &&
+ column_reader.descriptor()->max_repetition_level() == 0) {
+ batch->levels_written = batch->records_read;
+ }
if (batch->levels_written < batch->records_read || batch->values_written <
0 ||
batch->values_written > batch->levels_written) {
return Status::Corruption(
@@ -501,7 +507,8 @@ Status read_nested_scalar_batch(ScalarColumnReader&
column_reader, int64_t batch
const int16_t max_definition_level =
column_reader.descriptor()->max_definition_level();
NullMap value_null_map;
for (int64_t level_idx = 0; level_idx < batch->levels_written;
++level_idx) {
- if (batch->def_levels[level_idx] >= value_slot_definition_level) {
+ const bool has_value = batch->def_levels[level_idx] ==
max_definition_level;
+ if (batch->def_levels[level_idx] >= value_slot_definition_level &&
has_value) {
if (value_idx >= batch->values_written) {
return Status::Corruption(
"Nested parquet reader returned fewer values than
definition levels for "
@@ -509,8 +516,10 @@ Status read_nested_scalar_batch(ScalarColumnReader&
column_reader, int64_t batch
column_reader.name());
}
batch->value_indices[level_idx] = value_idx++;
+ }
+ if (batch->def_levels[level_idx] >= value_slot_definition_level) {
if (column_reader.type()->is_nullable()) {
- value_null_map.push_back(batch->def_levels[level_idx] !=
max_definition_level);
+ value_null_map.push_back(!has_value);
}
}
}
@@ -574,6 +583,32 @@ Status append_scalar_batch_value(const ScalarColumnReader&
column_reader,
return Status::OK();
}
+bool supports_nested_scalar_record_reader(const ParquetColumnSchema&
column_schema) {
+ if (supports_record_reader(column_schema.type_descriptor)) {
+ return true;
+ }
+ const auto& type_descriptor = column_schema.type_descriptor;
+ if (type_descriptor.extra_type_info != ParquetExtraTypeInfo::NONE ||
+ type_descriptor.is_decimal || type_descriptor.is_timestamp ||
+ type_descriptor.is_string_like) {
+ return false;
+ }
+ if (type_descriptor.converted_type != ::parquet::ConvertedType::NONE &&
+ type_descriptor.converted_type != ::parquet::ConvertedType::UNDEFINED)
{
+ return false;
+ }
+ switch (type_descriptor.physical_type) {
+ case ::parquet::Type::BOOLEAN:
+ case ::parquet::Type::INT32:
+ case ::parquet::Type::INT64:
+ case ::parquet::Type::FLOAT:
+ case ::parquet::Type::DOUBLE:
+ return true;
+ default:
+ return false;
+ }
+}
+
ColumnArray* array_column_from_output(MutableColumnPtr& column) {
if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column))
{
return
assert_cast<ColumnArray*>(&nullable_column->get_nested_column());
@@ -1349,7 +1384,7 @@ Status
ParquetColumnReaderFactory::create_nested_scalar_column_reader(
return Status::InvalidArgument("Invalid parquet leaf column id {} for
column {}",
column_schema.leaf_column_id,
column_schema.name);
}
- if (!supports_record_reader(column_schema.type_descriptor)) {
+ if (!supports_nested_scalar_record_reader(column_schema)) {
return Status::NotSupported(
"Current parquet nested scalar reader does not support column
{}",
column_schema.name);
diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp
b/be/test/format/new_parquet/parquet_column_reader_test.cpp
index 1bacab7cf8e..ca4003cf377 100644
--- a/be/test/format/new_parquet/parquet_column_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp
@@ -740,6 +740,7 @@ protected:
void read_and_validate(size_t field_idx) const {
auto reader = create_reader(field_idx);
+ ASSERT_NE(reader, nullptr);
MutableColumnPtr column = reader->type()->create_column();
int64_t rows_read = 0;
auto st = reader->read(ROW_COUNT, column, &rows_read);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]