This is an automated email from the ASF dual-hosted git repository.

suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new 6f4dac32501 [feature](be) Add basic parquet list reader
6f4dac32501 is described below

commit 6f4dac325011df9b4c4df11f5087c0aa5e5db1c3
Author: Socrates <[email protected]>
AuthorDate: Thu May 28 11:02:39 2026 +0800

    [feature](be) Add basic parquet list reader
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary: Add the next step of complex type support in the new 
parquet reader by normalizing standard LIST schema to Array(element), allowing 
nested leaf RecordReader usage, and reading non-empty LIST<required primitive> 
columns from repetition levels.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Manual test
        - Ran clang-format dry-run and git diff --check for modified files.
        - Ran BUILD_TYPE=DEBUG ./build.sh --be on Fedora successfully with the 
patch applied.
        - Attempted ParquetColumnReaderTest on Fedora, but stopped the ASAN_UT 
build because it triggered a fresh full UT build; no test binary execution 
result was produced.
    - Behavior changed: Yes. The new parquet reader can now read a limited 
non-empty LIST<required primitive> shape and reports NotSupported for 
unsupported list shapes instead of rejecting all LIST columns.
    - Does this need documentation: No
---
 be/src/format/new_parquet/column_reader.cpp        | 235 ++++++++++++++++++---
 be/src/format/new_parquet/column_reader.h          |   7 +
 .../format/new_parquet/parquet_column_schema.cpp   |  24 ++-
 be/src/format/new_parquet/parquet_type.cpp         |   4 -
 .../new_parquet/parquet_column_reader_test.cpp     |  39 ++++
 5 files changed, 274 insertions(+), 35 deletions(-)

diff --git a/be/src/format/new_parquet/column_reader.cpp 
b/be/src/format/new_parquet/column_reader.cpp
index f1674b767b0..9952016832c 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -31,7 +31,9 @@
 #include <vector>
 
 #include "core/column/column.h"
+#include "core/column/column_array.h"
 #include "core/column/column_struct.h"
+#include "core/data_type/data_type_array.h"
 #include "core/data_type/data_type_nullable.h"
 #include "core/data_type/data_type_struct.h"
 #include "core/data_type_serde/decoded_column_view.h"
@@ -66,6 +68,7 @@ public:
     const std::shared_ptr<::parquet::internal::RecordReader>& record_reader() 
const {
         return _record_reader;
     }
+    const ParquetTypeDescriptor& type_descriptor() const { return 
_type_descriptor; }
 
 private:
     int _file_column_id = -1;
@@ -101,6 +104,32 @@ private:
     std::vector<std::unique_ptr<ParquetColumnReader>> _children;
 };
 
+class ListColumnReader final : public ParquetColumnReader {
+public:
+    ListColumnReader(const ParquetColumnSchema& schema, DataTypePtr type,
+                     std::unique_ptr<ParquetColumnReader> element_reader)
+            : _field_id(schema.top_level_field_id),
+              _repeated_repetition_level(schema.repeated_repetition_level),
+              _type(std::move(type)),
+              _name(schema.name),
+              _element_reader(std::move(element_reader)) {}
+
+    int file_column_id() const override { return _field_id; }
+    int parquet_leaf_column_id() const override { return -1; }
+    const DataTypePtr& type() const override { return _type; }
+    const std::string& name() const override { return _name; }
+
+    Status read(int64_t rows, MutableColumnPtr& column, int64_t* rows_read) 
override;
+    Status skip(int64_t rows) override;
+
+private:
+    int _field_id = -1;
+    int16_t _repeated_repetition_level = 0;
+    DataTypePtr _type;
+    std::string _name;
+    std::unique_ptr<ParquetColumnReader> _element_reader;
+};
+
 Status read_records(ScalarColumnReader& column_reader, int64_t batch_rows,
                     ::parquet::internal::RecordReader** record_reader, 
int64_t* rows_read) {
     auto reader = column_reader.record_reader();
@@ -275,6 +304,34 @@ Status build_binary_values(const ScalarColumnReader& 
column_reader,
     return Status::OK();
 }
 
+Status append_scalar_values(const ScalarColumnReader& column_reader,
+                            ::parquet::internal::RecordReader& record_reader, 
int64_t row_count,
+                            const NullMap* null_map, MutableColumnPtr& column) 
{
+    std::vector<StringRef> binary_values;
+    std::vector<std::shared_ptr<::arrow::Array>> binary_chunks;
+    DecodedColumnView view;
+    view.value_kind = decoded_value_kind(column_reader.type_descriptor());
+    view.time_unit = 
decoded_time_unit(column_reader.type_descriptor().time_unit);
+    view.row_count = row_count;
+    view.decimal_precision = column_reader.type_descriptor().decimal_precision;
+    view.decimal_scale = column_reader.type_descriptor().decimal_scale;
+    view.fixed_length = column_reader.type_descriptor().fixed_length;
+    view.null_map = null_map == nullptr || null_map->empty() ? nullptr : 
null_map->data();
+    if (view.value_kind == DecodedValueKind::BINARY ||
+        view.value_kind == DecodedValueKind::FIXED_BINARY) {
+        RETURN_IF_ERROR(get_binary_chunks(column_reader, record_reader, 
&binary_chunks));
+        RETURN_IF_ERROR(
+                build_binary_values(column_reader, binary_chunks, row_count, 
&binary_values));
+        view.binary_values = &binary_values;
+    } else {
+        view.values = record_reader.values();
+    }
+
+    RETURN_IF_ERROR(
+            
column_reader.type()->get_serde()->read_column_from_decoded_values(*column, 
view));
+    return Status::OK();
+}
+
 } // namespace
 
 Status ScalarColumnReader::read(int64_t rows, MutableColumnPtr& column, 
int64_t* rows_read) {
@@ -297,26 +354,7 @@ Status ScalarColumnReader::read(int64_t rows, 
MutableColumnPtr& column, int64_t*
     NullMap null_map;
     RETURN_IF_ERROR(build_null_map(*this, *record_reader, *rows_read, 
&null_map));
 
-    std::vector<StringRef> binary_values;
-    std::vector<std::shared_ptr<::arrow::Array>> binary_chunks;
-    DecodedColumnView view;
-    view.value_kind = decoded_value_kind(_type_descriptor);
-    view.time_unit = decoded_time_unit(_type_descriptor.time_unit);
-    view.row_count = *rows_read;
-    view.decimal_precision = _type_descriptor.decimal_precision;
-    view.decimal_scale = _type_descriptor.decimal_scale;
-    view.fixed_length = _type_descriptor.fixed_length;
-    view.null_map = null_map.empty() ? nullptr : null_map.data();
-    if (view.value_kind == DecodedValueKind::BINARY ||
-        view.value_kind == DecodedValueKind::FIXED_BINARY) {
-        RETURN_IF_ERROR(get_binary_chunks(*this, *record_reader, 
&binary_chunks));
-        RETURN_IF_ERROR(build_binary_values(*this, binary_chunks, *rows_read, 
&binary_values));
-        view.binary_values = &binary_values;
-    } else {
-        view.values = record_reader->values();
-    }
-
-    
RETURN_IF_ERROR(_type->get_serde()->read_column_from_decoded_values(*column, 
view));
+    RETURN_IF_ERROR(append_scalar_values(*this, *record_reader, *rows_read, 
&null_map, column));
     return Status::OK();
 }
 
@@ -395,6 +433,92 @@ Status StructColumnReader::skip(int64_t rows) {
     return Status::OK();
 }
 
+Status ListColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* 
rows_read) {
+    if (column.get() == nullptr || rows_read == nullptr) {
+        return Status::InvalidArgument("Invalid parquet list read result 
pointer for column {}",
+                                       _name);
+    }
+    if (_element_reader == nullptr) {
+        return Status::InternalError("Parquet list element reader is not 
initialized for column {}",
+                                     _name);
+    }
+    auto* element_reader = 
dynamic_cast<ScalarColumnReader*>(_element_reader.get());
+    if (element_reader == nullptr) {
+        return Status::NotSupported(
+                "Current parquet LIST reader only supports scalar elements for 
column {}", _name);
+    }
+    if (element_reader->descriptor()->max_definition_level() != 1) {
+        return Status::NotSupported(
+                "Current parquet LIST reader only supports required elements 
for column {}", _name);
+    }
+
+    ::parquet::internal::RecordReader* record_reader = nullptr;
+    int64_t records_read = 0;
+    RETURN_IF_ERROR(read_records(*element_reader, rows, &record_reader, 
&records_read));
+    const int64_t levels_written = record_reader->levels_written();
+    if (records_read != rows || levels_written < records_read) {
+        return Status::Corruption(
+                "Invalid parquet LIST read result for column {}: rows={}, 
levels={}", _name,
+                records_read, levels_written);
+    }
+    if (record_reader->values_written() != levels_written) {
+        return Status::NotSupported(
+                "Current parquet LIST reader only supports non-empty lists 
with required "
+                "elements for column {}",
+                _name);
+    }
+    const int16_t max_definition_level = 
element_reader->descriptor()->max_definition_level();
+    if (auto* def_levels = record_reader->def_levels(); def_levels != nullptr) 
{
+        for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+            if (def_levels[level_idx] != max_definition_level) {
+                return Status::NotSupported(
+                        "Current parquet LIST reader only supports non-empty 
lists with required "
+                        "elements for column {}",
+                        _name);
+            }
+        }
+    }
+
+    auto& array_column = assert_cast<ColumnArray&>(*column);
+    auto nested_column = array_column.get_data_ptr()->assume_mutable();
+    RETURN_IF_ERROR(append_scalar_values(*element_reader, *record_reader, 
levels_written, nullptr,
+                                         nested_column));
+    array_column.get_data_ptr() = std::move(nested_column);
+
+    auto* rep_levels = record_reader->rep_levels();
+    if (rep_levels == nullptr && levels_written > 0) {
+        return Status::Corruption(
+                "Parquet LIST reader returned null repetition levels for 
column {}", _name);
+    }
+    auto& offsets = array_column.get_offsets();
+    offsets.reserve(offsets.size() + static_cast<size_t>(records_read));
+    size_t current_offset = offsets.empty() ? 0 : offsets.back();
+    int64_t current_record = 0;
+    for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+        if (level_idx == 0 || rep_levels[level_idx] < 
_repeated_repetition_level) {
+            if (level_idx != 0) {
+                offsets.push_back(current_offset);
+                current_record++;
+            }
+        }
+        current_offset++;
+    }
+    while (current_record < records_read) {
+        offsets.push_back(current_offset);
+        current_record++;
+    }
+    *rows_read = records_read;
+    return Status::OK();
+}
+
+Status ListColumnReader::skip(int64_t rows) {
+    if (rows <= 0) {
+        return Status::OK();
+    }
+    DORIS_CHECK(_element_reader != nullptr);
+    return _element_reader->skip(rows);
+}
+
 Status ParquetColumnReader::skip(int64_t rows) {
     return Status::NotSupported("Parquet column skip is not implemented, 
rows={}", rows);
 }
@@ -469,6 +593,42 @@ Status 
ParquetColumnReaderFactory::create_scalar_column_reader(
                 "column {} is not supported",
                 column_schema.name);
     }
+    if (column_schema.descriptor == nullptr ||
+        column_schema.descriptor->max_repetition_level() != 0 ||
+        column_schema.descriptor->max_definition_level() > 1) {
+        return Status::NotSupported(
+                "Current parquet scalar reader only supports flat primitive 
columns; column {} is "
+                "not supported",
+                column_schema.name);
+    }
+    std::shared_ptr<::parquet::internal::RecordReader> record_reader;
+    RETURN_IF_ERROR(get_record_reader(column_schema.leaf_column_id, 
column_schema.descriptor,
+                                      column_schema.name, &record_reader));
+    return create_scalar_reader(column_schema.leaf_column_id, 
column_schema.type_descriptor,
+                                column_schema.descriptor, column_schema.type, 
column_schema.name,
+                                std::move(record_reader), reader);
+}
+
+Status ParquetColumnReaderFactory::create_nested_scalar_column_reader(
+        const ParquetColumnSchema& column_schema,
+        std::unique_ptr<ParquetColumnReader>* reader) const {
+    if (reader == nullptr) {
+        return Status::InvalidArgument("reader is null");
+    }
+    if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE) {
+        return Status::InvalidArgument("Parquet nested scalar reader requires 
primitive column {}",
+                                       column_schema.name);
+    }
+    if (column_schema.leaf_column_id < 0 ||
+        column_schema.leaf_column_id >= 
static_cast<int>(_record_readers.size())) {
+        return Status::InvalidArgument("Invalid parquet leaf column id {} for 
column {}",
+                                       column_schema.leaf_column_id, 
column_schema.name);
+    }
+    if (!supports_record_reader(column_schema.type_descriptor)) {
+        return Status::NotSupported(
+                "Current parquet nested scalar reader does not support column 
{}",
+                column_schema.name);
+    }
     std::shared_ptr<::parquet::internal::RecordReader> record_reader;
     RETURN_IF_ERROR(get_record_reader(column_schema.leaf_column_id, 
column_schema.descriptor,
                                       column_schema.name, &record_reader));
@@ -494,12 +654,6 @@ Status ParquetColumnReaderFactory::get_record_reader(
     if (descriptor == nullptr) {
         return Status::InvalidArgument("Parquet column descriptor is null for 
column {}", name);
     }
-    if (descriptor->max_repetition_level() != 0 || 
descriptor->max_definition_level() > 1) {
-        return Status::NotSupported(
-                "Current parquet reader only supports RecordReader-backed 
columns; column {} is "
-                "not supported",
-                name);
-    }
     if (_record_readers[leaf_column_id] == nullptr) {
         try {
             _record_readers[leaf_column_id] =
@@ -569,6 +723,32 @@ Status 
ParquetColumnReaderFactory::create_struct_column_reader(
     return Status::OK();
 }
 
+Status ParquetColumnReaderFactory::create_list_column_reader(
+        const ParquetColumnSchema& column_schema, const 
reader::FieldProjection* projection,
+        std::unique_ptr<ParquetColumnReader>* reader) const {
+    if (reader == nullptr) {
+        return Status::InvalidArgument("reader is null");
+    }
+    if (projection != nullptr && !projection->project_all_children) {
+        return Status::NotSupported("Parquet LIST projection is not 
implemented for column {}",
+                                    column_schema.name);
+    }
+    if (column_schema.type != nullptr && column_schema.type->is_nullable()) {
+        return Status::NotSupported("Nullable parquet LIST reader is not 
implemented for column {}",
+                                    column_schema.name);
+    }
+    if (column_schema.children.size() != 1) {
+        return Status::NotSupported("Unsupported parquet LIST layout for 
column {}",
+                                    column_schema.name);
+    }
+    std::unique_ptr<ParquetColumnReader> element_reader;
+    RETURN_IF_ERROR(
+            create_nested_scalar_column_reader(*column_schema.children[0], 
&element_reader));
+    *reader = std::make_unique<ListColumnReader>(column_schema, 
column_schema.type,
+                                                 std::move(element_reader));
+    return Status::OK();
+}
+
 Status ParquetColumnReaderFactory::create(const ParquetColumnSchema& 
column_schema,
                                           const reader::FieldProjection* 
projection,
                                           
std::unique_ptr<ParquetColumnReader>* reader) const {
@@ -581,8 +761,7 @@ Status ParquetColumnReaderFactory::create(const 
ParquetColumnSchema& column_sche
     case ParquetColumnSchemaKind::STRUCT:
         return create_struct_column_reader(column_schema, projection, reader);
     case ParquetColumnSchemaKind::LIST:
-        return Status::NotSupported("Parquet LIST reader is not implemented 
for column {}",
-                                    column_schema.name);
+        return create_list_column_reader(column_schema, projection, reader);
     case ParquetColumnSchemaKind::MAP:
         return Status::NotSupported("Parquet MAP reader is not implemented for 
column {}",
                                     column_schema.name);
diff --git a/be/src/format/new_parquet/column_reader.h 
b/be/src/format/new_parquet/column_reader.h
index 93881ac8c48..ec691a9743e 100644
--- a/be/src/format/new_parquet/column_reader.h
+++ b/be/src/format/new_parquet/column_reader.h
@@ -104,10 +104,17 @@ private:
     Status create_scalar_column_reader(const ParquetColumnSchema& 
column_schema,
                                        std::unique_ptr<ParquetColumnReader>* 
reader) const;
 
+    Status create_nested_scalar_column_reader(const ParquetColumnSchema& 
column_schema,
+                                              
std::unique_ptr<ParquetColumnReader>* reader) const;
+
     Status create_struct_column_reader(const ParquetColumnSchema& 
column_schema,
                                        const reader::FieldProjection* 
projection,
                                        std::unique_ptr<ParquetColumnReader>* 
reader) const;
 
+    Status create_list_column_reader(const ParquetColumnSchema& column_schema,
+                                     const reader::FieldProjection* projection,
+                                     std::unique_ptr<ParquetColumnReader>* 
reader) const;
+
     Status get_record_reader(int leaf_column_id, const 
::parquet::ColumnDescriptor* descriptor,
                              const std::string& name,
                              
std::shared_ptr<::parquet::internal::RecordReader>* reader) const;
diff --git a/be/src/format/new_parquet/parquet_column_schema.cpp 
b/be/src/format/new_parquet/parquet_column_schema.cpp
index 3235ea38a06..8541769c1d2 100644
--- a/be/src/format/new_parquet/parquet_column_schema.cpp
+++ b/be/src/format/new_parquet/parquet_column_schema.cpp
@@ -140,6 +140,9 @@ Status build_node_schema(const ::parquet::SchemaDescriptor& 
schema,
             return Status::NotSupported("Unsupported parquet column type for 
column {}",
                                         node.name());
         }
+        column_schema->type = node.is_optional()
+                                      ? 
make_nullable(remove_nullable(column_schema->type))
+                                      : remove_nullable(column_schema->type);
         *result = std::move(column_schema);
         return Status::OK();
     }
@@ -151,10 +154,25 @@ Status build_node_schema(const 
::parquet::SchemaDescriptor& schema,
             return Status::NotSupported("Unsupported parquet LIST encoding for 
column {}",
                                         node.name());
         }
+        const auto& repeated_node = *group.field(0);
+        if (!repeated_node.is_repeated() || repeated_node.is_primitive()) {
+            return Status::NotSupported("Unsupported parquet LIST encoding for 
column {}",
+                                        node.name());
+        }
+        const auto& repeated_group =
+                static_cast<const 
::parquet::schema::GroupNode&>(repeated_node);
+        if (repeated_group.field_count() != 1) {
+            return Status::NotSupported("Unsupported parquet LIST element 
layout for column {}",
+                                        node.name());
+        }
+        auto repeated_context =
+                child_context(context, repeated_node, 0, 
column_schema->schema_node_id);
+        column_schema->repeated_repetition_level = 
repeated_context.repeated_repetition_level;
         std::unique_ptr<ParquetColumnSchema> child;
-        RETURN_IF_ERROR(build_node_schema(
-                schema, *group.field(0),
-                child_context(context, *group.field(0), 0, 
column_schema->schema_node_id), &child));
+        RETURN_IF_ERROR(build_node_schema(schema, *repeated_group.field(0),
+                                          child_context(repeated_context, 
*repeated_group.field(0),
+                                                        0, 
column_schema->schema_node_id),
+                                          &child));
         column_schema->type =
                 
nullable_if_needed(std::make_shared<DataTypeArray>(child->type), node);
         column_schema->children.push_back(std::move(child));
diff --git a/be/src/format/new_parquet/parquet_type.cpp 
b/be/src/format/new_parquet/parquet_type.cpp
index 53c7b4f2ed9..4079c989f7d 100644
--- a/be/src/format/new_parquet/parquet_type.cpp
+++ b/be/src/format/new_parquet/parquet_type.cpp
@@ -323,10 +323,6 @@ ParquetTypeDescriptor resolve_parquet_type(const 
::parquet::ColumnDescriptor* co
             !result.is_decimal && (result.physical_type == 
::parquet::Type::BYTE_ARRAY ||
                                    result.physical_type == 
::parquet::Type::FIXED_LEN_BYTE_ARRAY);
 
-    if (column->max_repetition_level() != 0 || column->max_definition_level() 
> 1) {
-        result.supports_record_reader = false;
-        return result;
-    }
     if (!record_reader_physical_type_supported(result.physical_type)) {
         result.supports_record_reader = false;
         return result;
diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp 
b/be/test/format/new_parquet/parquet_column_reader_test.cpp
index 97773a5bada..b85bbb80a6a 100644
--- a/be/test/format/new_parquet/parquet_column_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp
@@ -29,12 +29,14 @@
 #include <vector>
 
 #include "core/assert_cast.h"
+#include "core/column/column_array.h"
 #include "core/column/column_decimal.h"
 #include "core/column/column_nullable.h"
 #include "core/column/column_string.h"
 #include "core/column/column_struct.h"
 #include "core/column/column_vector.h"
 #include "core/data_type/data_type.h"
+#include "core/data_type/data_type_array.h"
 #include "core/data_type/data_type_nullable.h"
 #include "core/data_type/data_type_struct.h"
 #include "core/types.h"
@@ -144,6 +146,21 @@ protected:
         return finish_array(&builder);
     }
 
+    std::shared_ptr<arrow::Array> build_required_int_list_array() {
+        auto value_builder = std::make_shared<arrow::Int32Builder>();
+        arrow::ListBuilder builder(arrow::default_memory_pool(), 
value_builder);
+        const std::vector<std::vector<int32_t>> values = {
+                {1, 2}, {3}, {4, 5, 6}, {7}, {8, 9},
+        };
+        for (const auto& row : values) {
+            EXPECT_TRUE(builder.Append().ok());
+            for (const auto value : row) {
+                EXPECT_TRUE(value_builder->Append(value).ok());
+            }
+        }
+        return finish_array(&builder);
+    }
+
     std::shared_ptr<arrow::Array> build_time32_array(const 
std::shared_ptr<arrow::DataType>& type,
                                                      const 
std::vector<int32_t>& values) {
         arrow::Time32Builder builder(type, arrow::default_memory_pool());
@@ -365,6 +382,28 @@ protected:
                       EXPECT_EQ(b_values.get_data_at(1).to_string(), "sb");
                       EXPECT_EQ(b_values.get_data_at(4).to_string(), "se");
                   });
+        add_field(arrow::field("list_int_col",
+                               arrow::list(arrow::field("element", 
arrow::int32(), false)), false),
+                  build_required_int_list_array(),
+                  [](const ParquetColumnSchema& schema, const IColumn& column) 
{
+                      
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(), TYPE_ARRAY);
+                      const auto* array_type =
+                              assert_cast<const 
DataTypeArray*>(remove_nullable(schema.type).get());
+                      EXPECT_EQ(
+                              
remove_nullable(array_type->get_nested_type())->get_primitive_type(),
+                              TYPE_INT);
+                      const auto& array_column = assert_cast<const 
ColumnArray&>(column);
+                      ASSERT_EQ(array_column.size(), ROW_COUNT);
+                      EXPECT_EQ(array_column.size_at(0), 2);
+                      EXPECT_EQ(array_column.size_at(1), 1);
+                      EXPECT_EQ(array_column.size_at(2), 3);
+                      EXPECT_EQ(array_column.size_at(4), 2);
+                      const auto& values = assert_cast<const 
ColumnInt32&>(array_column.get_data());
+                      ASSERT_EQ(values.size(), 9);
+                      EXPECT_EQ(values.get_element(0), 1);
+                      EXPECT_EQ(values.get_element(5), 6);
+                      EXPECT_EQ(values.get_element(8), 9);
+                  });
 
         auto schema = arrow::schema(_arrow_fields);
         auto table = arrow::Table::Make(schema, _arrays);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to