tustvold commented on code in PR #1682:
URL: https://github.com/apache/arrow-rs/pull/1682#discussion_r869976905


##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -52,657 +50,278 @@ pub fn build_array_reader<T>(
 where
     T: IntoIterator<Item = usize>,
 {
-    let mut leaves = HashMap::<*const Type, usize>::new();
-
-    let mut filtered_root_names = HashSet::<String>::new();
-
-    for c in column_indices {
-        let column = parquet_schema.column(c).self_type() as *const Type;
-
-        leaves.insert(column, c);
-
-        let root = parquet_schema.get_column_root_ptr(c);
-        filtered_root_names.insert(root.name().to_string());
+    let field = convert_schema(
+        parquet_schema.as_ref(),
+        column_indices,
+        Some(arrow_schema.as_ref()),
+    )?;
+
+    match &field {
+        Some(field) => build_reader(field, row_groups.as_ref()),
+        None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
-
-    // Only pass root fields that take part in the projection
-    // to avoid traversal of columns that are not read.
-    // TODO: also prune unread parts of the tree in child structures
-    let filtered_root_fields = parquet_schema
-        .root_schema()
-        .get_fields()
-        .iter()
-        .filter(|field| filtered_root_names.contains(field.name()))
-        .cloned()
-        .collect::<Vec<_>>();
-
-    let proj = Type::GroupType {
-        basic_info: parquet_schema.root_schema().get_basic_info().clone(),
-        fields: filtered_root_fields,
-    };
-
-    ArrayReaderBuilder::new(Arc::new(proj), arrow_schema, Arc::new(leaves), 
row_groups)
-        .build_array_reader()
 }
 
-/// Used to build array reader.
-struct ArrayReaderBuilder {
-    root_schema: TypePtr,
-    arrow_schema: Arc<Schema>,
-    // Key: columns that need to be included in final array builder
-    // Value: column index in schema
-    columns_included: Arc<HashMap<*const Type, usize>>,
-    row_groups: Box<dyn RowGroupCollection>,
+fn build_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    match field.field_type {
+        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, 
row_groups),
+        ParquetFieldType::Group { .. } => match &field.arrow_type {
+            DataType::Map(_, _) => build_map_reader(field, row_groups),
+            DataType::Struct(_) => build_struct_reader(field, row_groups),
+            DataType::List(_) => build_list_reader(field, false, row_groups),
+            DataType::LargeList(_) => build_list_reader(field, true, 
row_groups),
+            d => unimplemented!("reading group type {} not implemented", d),
+        },
+    }
 }
 
-/// Used in type visitor.
-#[derive(Clone)]
-struct ArrayReaderBuilderContext {
-    def_level: i16,
-    rep_level: i16,
-    path: ColumnPath,
+/// Build array reader for map type.
+fn build_map_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 2);
+
+    let key_reader = build_reader(&children[0], row_groups)?;
+    let value_reader = build_reader(&children[1], row_groups)?;
+
+    Ok(Box::new(MapArrayReader::new(
+        key_reader,
+        value_reader,
+        field.arrow_type.clone(),
+        field.def_level,
+        field.rep_level,
+    )))
 }
 
-impl Default for ArrayReaderBuilderContext {
-    fn default() -> Self {
-        Self {
-            def_level: 0i16,
-            rep_level: 0i16,
-            path: ColumnPath::new(Vec::new()),
-        }
+/// Build array reader for list type.
+fn build_list_reader(
+    field: &ParquetField,
+    is_large: bool,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let children = field.children().unwrap();
+    assert_eq!(children.len(), 1);
+
+    let data_type = field.arrow_type.clone();
+    let item_reader = build_reader(&children[0], row_groups)?;
+    let item_type = item_reader.get_data_type().clone();
+
+    match is_large {
+        false => Ok(Box::new(ListArrayReader::<i32>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
+        true => Ok(Box::new(ListArrayReader::<i64>::new(
+            item_reader,
+            data_type,
+            item_type,
+            field.def_level,
+            field.rep_level,
+            field.nullable,
+        )) as _),
     }
 }
 
-/// Create array reader by visiting schema.
-impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a 
ArrayReaderBuilderContext>
-    for ArrayReaderBuilder
-{
-    /// Build array reader for primitive type.
-    fn visit_primitive(
-        &mut self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        if self.is_included(cur_type.as_ref()) {
-            let mut new_context = context.clone();
-            new_context.path.append(vec![cur_type.name().to_string()]);
-
-            let null_mask_only = match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated primitive ({:?}) is not supported 
yet!",
-                        cur_type.name()
-                    )));
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-
-                    // Can just compute null mask if no parent
-                    context.def_level == 0 && context.rep_level == 0
-                }
-                _ => false,
-            };
-
-            let reader = self.build_for_primitive_type_inner(
-                cur_type,
-                &new_context,
-                null_mask_only,
-            )?;
-
-            Ok(Some(reader))
-        } else {
-            Ok(None)
-        }
-    }
-
-    /// Build array reader for struct type.
-    fn visit_struct(
-        &mut self,
-        cur_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![cur_type.name().to_string()]);
-
-        if cur_type.get_basic_info().has_repetition() {
-            match cur_type.get_basic_info().repetition() {
-                Repetition::REPEATED => {
-                    return Err(ArrowError(format!(
-                        "Reading repeated struct ({:?}) is not supported yet!",
-                        cur_type.name(),
-                    )))
-                }
-                Repetition::OPTIONAL => {
-                    new_context.def_level += 1;
-                }
-                Repetition::REQUIRED => {}
-            }
-        }
-
-        self.build_for_struct_type_inner(&cur_type, &new_context)
-    }
-
-    /// Build array reader for map type.
-    fn visit_map(
-        &mut self,
-        map_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        // Add map type to context
-        let mut new_context = context.clone();
-        new_context.path.append(vec![map_type.name().to_string()]);
-
-        match map_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => {}
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-            }
-            Repetition::REPEATED => {
-                return Err(ArrowError("Map cannot be repeated".to_string()))
+/// Creates primitive array reader for each primitive type.
+fn build_primitive_reader(
+    field: &ParquetField,
+    row_groups: &dyn RowGroupCollection,
+) -> Result<Box<dyn ArrayReader>> {
+    let (col_idx, primitive_type, type_len) = match &field.field_type {
+        ParquetFieldType::Primitive {
+            col_idx,
+            primitive_type,
+        } => match primitive_type.as_ref() {
+            Type::PrimitiveType { type_length, .. } => {
+                (*col_idx, primitive_type.clone(), *type_length)
             }
-        }
-
-        if map_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "Map field must have exactly one key_value child, found {}",
-                map_type.get_fields().len()
-            )));
-        }
-
-        // Add map entry (key_value) to context
-        let map_key_value = &map_type.get_fields()[0];
-        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED 
{
-            return Err(ArrowError(
-                "Child of map field must be repeated".to_string(),
-            ));
-        }
-
-        new_context
-            .path
-            .append(vec![map_key_value.name().to_string()]);
-
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        if map_key_value.get_fields().len() != 2 {
-            // According to the specification the values are optional (#1642)
-            return Err(ArrowError(format!(
-                "Child of map field must have two children, found {}",
-                map_key_value.get_fields().len()
-            )));
-        }
-
-        // Get key and value, and create context for each
-        let map_key = &map_key_value.get_fields()[0];
-        let map_value = &map_key_value.get_fields()[1];
-
-        if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
-            return Err(ArrowError("Map keys must be required".to_string()));
-        }
-
-        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
-            return Err(ArrowError("Map values cannot be 
repeated".to_string()));
-        }
-
-        let key_reader = self.dispatch(map_key.clone(), 
&new_context)?.unwrap();
-        let value_reader = self.dispatch(map_value.clone(), 
&new_context)?.unwrap();
-
-        let arrow_type = self
-            .arrow_schema
-            .field_with_name(map_type.name())
-            .ok()
-            .map(|f| f.data_type().to_owned())
-            .unwrap_or_else(|| {
-                ArrowType::Map(
-                    Box::new(Field::new(
-                        map_key_value.name(),
-                        ArrowType::Struct(vec![
-                            Field::new(
-                                map_key.name(),
-                                key_reader.get_data_type().clone(),
-                                false,
-                            ),
-                            Field::new(
-                                map_value.name(),
-                                value_reader.get_data_type().clone(),
-                                map_value.is_optional(),
-                            ),
-                        ]),
-                        map_type.is_optional(),
-                    )),
-                    false,
-                )
-            });
-
-        let key_array_reader: Box<dyn ArrayReader> = 
Box::new(MapArrayReader::new(
-            key_reader,
-            value_reader,
-            arrow_type,
-            new_context.def_level,
-            new_context.rep_level,
-        ));
-
-        Ok(Some(key_array_reader))
-    }
-
-    /// Build array reader for list type.
-    fn visit_list_with_item(
-        &mut self,
-        list_type: Arc<Type>,
-        item_type: Arc<Type>,
-        context: &'a ArrayReaderBuilderContext,
-    ) -> Result<Option<Box<dyn ArrayReader>>> {
-        let mut new_context = context.clone();
-        new_context.path.append(vec![list_type.name().to_string()]);
-
-        // If the list is nullable
-        let nullable = match list_type.get_basic_info().repetition() {
-            Repetition::REQUIRED => false,
-            Repetition::OPTIONAL => {
-                new_context.def_level += 1;
-                true
-            }
-            Repetition::REPEATED => {
-                return Err(general_err!("List type cannot be repeated"))
-            }
-        };
-
-        if list_type.get_fields().len() != 1 {
-            return Err(ArrowError(format!(
-                "List field must have exactly one child, found {}",
-                list_type.get_fields().len()
-            )));
-        }
-        let mut list_child = &list_type.get_fields()[0];
-
-        if list_child.get_basic_info().repetition() != Repetition::REPEATED {
-            return Err(ArrowError("List child must be repeated".to_string()));
-        }
-
-        // The repeated field
-        new_context.rep_level += 1;
-        new_context.def_level += 1;
-
-        match self.dispatch(item_type, &new_context) {
-            Ok(Some(item_reader)) => {
-                let item_type = item_reader.get_data_type().clone();
-
-                // a list is a group type with a single child. The list child's
-                // name comes from the child's field name.
-                // if the child's name is "list" and it has a child, then use 
this child
-                if list_child.name() == "list" && 
!list_child.get_fields().is_empty() {
-                    list_child = list_child.get_fields().first().unwrap();
-                }
-
-                let arrow_type = self
-                    .arrow_schema
-                    .field_with_name(list_type.name())
-                    .ok()
-                    .map(|f| f.data_type().to_owned())
-                    .unwrap_or_else(|| {
-                        ArrowType::List(Box::new(Field::new(
-                            list_child.name(),
-                            item_type.clone(),
-                            list_child.is_optional(),
-                        )))
-                    });
-
-                let list_array_reader: Box<dyn ArrayReader> = match arrow_type 
{
-                    ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    ArrowType::LargeList(_) => 
Box::new(ListArrayReader::<i64>::new(
-                        item_reader,
-                        arrow_type,
-                        item_type,
-                        new_context.def_level,
-                        new_context.rep_level,
-                        nullable,
-                    )),
-                    _ => {
-                        return Err(ArrowError(format!(
-                        "creating ListArrayReader with type {:?} should be 
unreachable",
-                        arrow_type
-                    )))
-                    }
-                };
-
-                Ok(Some(list_array_reader))
-            }
-            result => result,
-        }
-    }
-}
-
-impl<'a> ArrayReaderBuilder {
-    /// Construct array reader builder.
-    fn new(
-        root_schema: TypePtr,
-        arrow_schema: Arc<Schema>,
-        columns_included: Arc<HashMap<*const Type, usize>>,
-        file_reader: Box<dyn RowGroupCollection>,
-    ) -> Self {
-        Self {
-            root_schema,
-            arrow_schema,
-            columns_included,
-            row_groups: file_reader,
-        }
-    }
-
-    /// Main entry point.
-    fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
-        let context = ArrayReaderBuilderContext::default();
-
-        match self.visit_struct(self.root_schema.clone(), &context)? {
-            Some(reader) => Ok(reader),
-            None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
-        }
-    }
-
-    // Utility functions
-
-    /// Check whether one column in included in this array reader builder.
-    fn is_included(&self, t: &Type) -> bool {
-        self.columns_included.contains_key(&(t as *const Type))
-    }
+            Type::GroupType { .. } => unreachable!(),
+        },
+        _ => unreachable!(),
+    };
 
-    /// Creates primitive array reader for each primitive type.
-    fn build_for_primitive_type_inner(
-        &self,
-        cur_type: TypePtr,
-        context: &'a ArrayReaderBuilderContext,
-        null_mask_only: bool,
-    ) -> Result<Box<dyn ArrayReader>> {
-        let column_desc = Arc::new(ColumnDescriptor::new(
-            cur_type.clone(),
-            context.def_level,
-            context.rep_level,
-            context.path.clone(),
-        ));
+    let physical_type = primitive_type.get_physical_type();
 
-        let page_iterator = self
-            .row_groups
-            .column_chunks(self.columns_included[&(cur_type.as_ref() as *const 
Type)])?;
+    let column_desc = Arc::new(ColumnDescriptor::new(
+        primitive_type,
+        field.def_level,
+        field.rep_level,
+        ColumnPath::new(vec![]),

Review Comment:
   None of the readers actually use this field, so rather than tracking it, we 
just populate an empty path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to