Jefffrey commented on code in PR #6758:
URL: https://github.com/apache/arrow-rs/pull/6758#discussion_r1903173879


##########
arrow-array/src/record_batch.rs:
##########
@@ -394,6 +396,104 @@ impl RecordBatch {
         )
     }
 
+    /// Normalize a semi-structured [`RecordBatch`] into a flat table.
+    ///
+    /// `separator`: Nested [`Field`]s will generate names separated by 
`separator`, e.g. for
+    /// separator= "." and the schema:
+    /// ```text
+    ///     "foo": StructArray<"bar": Utf8>
+    /// ```
+    /// will generate:
+    /// ```text
+    ///     "foo.bar": Utf8
+    /// ```
+    /// `max_level`: The maximum number of levels (depth of the `Schema` and 
`Columns`) to
+    /// normalize. If `0`, normalizes all levels.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, 
RecordBatch};
+    /// # use arrow_schema::{DataType, Field, Fields, Schema};
+    ///
+    /// let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", 
""]));
+    /// let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), 
Some(4)]));
+    ///
+    /// let animals_field = Arc::new(Field::new("animals", DataType::Utf8, 
true));
+    /// let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, 
true));
+    ///
+    /// let a = Arc::new(StructArray::from(vec![
+    ///     (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
+    ///     (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
+    /// ]));
+    ///
+    /// let schema = Schema::new(vec![
+    ///     Field::new(
+    ///         "a",
+    ///         DataType::Struct(Fields::from(vec![animals_field, 
n_legs_field])),
+    ///         false,
+    ///     )
+    /// ]);
+    ///
+    /// let normalized = RecordBatch::try_new(Arc::new(schema), vec![a])
+    ///     .expect("valid conversion")
+    ///     .normalize(".", 0)
+    ///     .expect("valid normalization");
+    ///
+    /// let expected = RecordBatch::try_from_iter_with_nullable(vec![
+    ///     ("a.animals", animals.clone(), true),
+    ///     ("a.n_legs", n_legs.clone(), true),
+    /// ])
+    /// .expect("valid conversion");
+    ///
+    /// assert_eq!(expected, normalized);
+    /// ```
+    pub fn normalize(&self, separator: &str, mut max_level: usize) -> 
Result<Self, ArrowError> {
+        if max_level == 0 {
+            max_level = usize::MAX;
+        }
+        let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &DataType, 
bool)> = VecDeque::new();

Review Comment:
   This queue seems to instead behave like a stack; it does push_back only when 
initializing the queue, but otherwise does pop_front/push_front; would it be 
more intuitive to just use a Vec to more accurately indicate this is a stack?
   
   Also another note is you could remove need for storing DataType and 
nullability as separate tuple fields by just storing the original `&FieldRef` 
and retrieving DataType and nullability from it on demand; reduces the number 
of tuple fields by one which might be worth considering there's quite a few 
fields already.



##########
arrow-array/src/record_batch.rs:
##########
@@ -394,6 +396,104 @@ impl RecordBatch {
         )
     }
 
+    /// Normalize a semi-structured [`RecordBatch`] into a flat table.
+    ///
+    /// `separator`: Nested [`Field`]s will generate names separated by 
`separator`, e.g. for
+    /// separator= "." and the schema:
+    /// ```text
+    ///     "foo": StructArray<"bar": Utf8>
+    /// ```
+    /// will generate:
+    /// ```text
+    ///     "foo.bar": Utf8
+    /// ```
+    /// `max_level`: The maximum number of levels (depth of the `Schema` and 
`Columns`) to
+    /// normalize. If `0`, normalizes all levels.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, 
RecordBatch};
+    /// # use arrow_schema::{DataType, Field, Fields, Schema};
+    ///
+    /// let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", 
""]));
+    /// let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), 
Some(4)]));
+    ///
+    /// let animals_field = Arc::new(Field::new("animals", DataType::Utf8, 
true));
+    /// let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, 
true));
+    ///
+    /// let a = Arc::new(StructArray::from(vec![
+    ///     (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
+    ///     (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
+    /// ]));
+    ///
+    /// let schema = Schema::new(vec![
+    ///     Field::new(
+    ///         "a",
+    ///         DataType::Struct(Fields::from(vec![animals_field, 
n_legs_field])),
+    ///         false,
+    ///     )
+    /// ]);
+    ///
+    /// let normalized = RecordBatch::try_new(Arc::new(schema), vec![a])
+    ///     .expect("valid conversion")
+    ///     .normalize(".", 0)
+    ///     .expect("valid normalization");
+    ///
+    /// let expected = RecordBatch::try_from_iter_with_nullable(vec![
+    ///     ("a.animals", animals.clone(), true),
+    ///     ("a.n_legs", n_legs.clone(), true),
+    /// ])
+    /// .expect("valid conversion");
+    ///
+    /// assert_eq!(expected, normalized);
+    /// ```
+    pub fn normalize(&self, separator: &str, mut max_level: usize) -> 
Result<Self, ArrowError> {
+        if max_level == 0 {
+            max_level = usize::MAX;
+        }

Review Comment:
   ```suggestion
       pub fn normalize(&self, separator: &str, max_level: Option<usize>) -> 
Result<Self, ArrowError> {
           let max_level = max_level.unwrap_or(usize::MAX);
   ```
   
   imo this seems the more Rusty way, making use of Option instead of a 
sentinel value (though I'm not sure if `Some(0)` is a valid input?)



##########
arrow-array/src/record_batch.rs:
##########
@@ -394,6 +396,104 @@ impl RecordBatch {
         )
     }
 
+    /// Normalize a semi-structured [`RecordBatch`] into a flat table.
+    ///
+    /// `separator`: Nested [`Field`]s will generate names separated by 
`separator`, e.g. for
+    /// separator= "." and the schema:
+    /// ```text
+    ///     "foo": StructArray<"bar": Utf8>
+    /// ```
+    /// will generate:
+    /// ```text
+    ///     "foo.bar": Utf8
+    /// ```
+    /// `max_level`: The maximum number of levels (depth of the `Schema` and 
`Columns`) to
+    /// normalize. If `0`, normalizes all levels.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, 
RecordBatch};
+    /// # use arrow_schema::{DataType, Field, Fields, Schema};
+    ///
+    /// let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", 
""]));
+    /// let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), 
Some(4)]));
+    ///
+    /// let animals_field = Arc::new(Field::new("animals", DataType::Utf8, 
true));
+    /// let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, 
true));
+    ///
+    /// let a = Arc::new(StructArray::from(vec![
+    ///     (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
+    ///     (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
+    /// ]));
+    ///
+    /// let schema = Schema::new(vec![
+    ///     Field::new(
+    ///         "a",
+    ///         DataType::Struct(Fields::from(vec![animals_field, 
n_legs_field])),
+    ///         false,
+    ///     )
+    /// ]);
+    ///
+    /// let normalized = RecordBatch::try_new(Arc::new(schema), vec![a])
+    ///     .expect("valid conversion")
+    ///     .normalize(".", 0)
+    ///     .expect("valid normalization");
+    ///
+    /// let expected = RecordBatch::try_from_iter_with_nullable(vec![
+    ///     ("a.animals", animals.clone(), true),
+    ///     ("a.n_legs", n_legs.clone(), true),
+    /// ])
+    /// .expect("valid conversion");
+    ///
+    /// assert_eq!(expected, normalized);
+    /// ```
+    pub fn normalize(&self, separator: &str, mut max_level: usize) -> 
Result<Self, ArrowError> {
+        if max_level == 0 {
+            max_level = usize::MAX;
+        }
+        let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &DataType, 
bool)> = VecDeque::new();
+        for (c, f) in self.columns.iter().zip(self.schema.fields()) {
+            let name_vec: Vec<&str> = vec![f.name()];
+            queue.push_back((0, c, name_vec, f.data_type(), f.is_nullable()));
+        }
+        let mut columns: Vec<ArrayRef> = Vec::new();
+        let mut fields: Vec<FieldRef> = Vec::new();
+
+        while let Some((depth, c, name, data_type, nullable)) = 
queue.pop_front() {
+            if depth < max_level {
+                match data_type {
+                    DataType::Struct(ff) => {
+                        // Need to zip these in reverse to maintain original 
order
+                        for (cff, fff) in 
c.as_struct().columns().iter().zip(ff.into_iter()).rev() {
+                            let mut name = name.clone();
+                            name.push(separator);
+                            name.push(fff.name().as_str());
+                            queue.push_front((
+                                depth + 1,
+                                cff,
+                                name.clone(),
+                                fff.data_type(),
+                                fff.is_nullable(),
+                            ))
+                        }
+                    }
+                    _ => {
+                        let updated_field = Field::new(name.concat(), 
data_type.clone(), nullable);
+                        columns.push(c.clone());
+                        fields.push(Arc::new(updated_field));
+                    }
+                }
+            } else {
+                let updated_field = Field::new(name.concat(), 
data_type.clone(), nullable);
+                columns.push(c.clone());
+                fields.push(Arc::new(updated_field));
+            }

Review Comment:
   ```suggestion
               match data_type {
                   DataType::Struct(ff) if depth < max_level => {
                       // Need to zip these in reverse to maintain original 
order
                       for (cff, fff) in 
c.as_struct().columns().iter().zip(ff.into_iter()).rev() {
                           let mut name = name.clone();
                           name.push(separator);
                           name.push(fff.name().as_str());
                           queue.push_front((
                               depth + 1,
                               cff,
                               name.clone(),
                               fff.data_type(),
                               fff.is_nullable(),
                           ))
                       }
                   }
                   _ => {
                       let updated_field = Field::new(name.concat(), 
data_type.clone(), nullable);
                       columns.push(c.clone());
                       fields.push(Arc::new(updated_field));
                   }
               }
   ```
   
   Noticed the else branch of the `if depth < max_level {` and the catch-all 
branch of the match had identical actions; in that case can simplify by 
leveraging power of match expressions more.



##########
arrow-array/src/record_batch.rs:
##########
@@ -394,6 +396,104 @@ impl RecordBatch {
         )
     }
 
+    /// Normalize a semi-structured [`RecordBatch`] into a flat table.
+    ///
+    /// `separator`: Nested [`Field`]s will generate names separated by 
`separator`, e.g. for
+    /// separator= "." and the schema:
+    /// ```text
+    ///     "foo": StructArray<"bar": Utf8>
+    /// ```
+    /// will generate:
+    /// ```text
+    ///     "foo.bar": Utf8
+    /// ```
+    /// `max_level`: The maximum number of levels (depth of the `Schema` and 
`Columns`) to
+    /// normalize. If `0`, normalizes all levels.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, 
RecordBatch};
+    /// # use arrow_schema::{DataType, Field, Fields, Schema};
+    ///
+    /// let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", 
""]));
+    /// let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), 
Some(4)]));
+    ///
+    /// let animals_field = Arc::new(Field::new("animals", DataType::Utf8, 
true));
+    /// let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, 
true));
+    ///
+    /// let a = Arc::new(StructArray::from(vec![
+    ///     (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
+    ///     (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
+    /// ]));
+    ///
+    /// let schema = Schema::new(vec![
+    ///     Field::new(
+    ///         "a",
+    ///         DataType::Struct(Fields::from(vec![animals_field, 
n_legs_field])),
+    ///         false,
+    ///     )
+    /// ]);
+    ///
+    /// let normalized = RecordBatch::try_new(Arc::new(schema), vec![a])
+    ///     .expect("valid conversion")
+    ///     .normalize(".", 0)
+    ///     .expect("valid normalization");
+    ///
+    /// let expected = RecordBatch::try_from_iter_with_nullable(vec![
+    ///     ("a.animals", animals.clone(), true),
+    ///     ("a.n_legs", n_legs.clone(), true),
+    /// ])
+    /// .expect("valid conversion");
+    ///
+    /// assert_eq!(expected, normalized);
+    /// ```
+    pub fn normalize(&self, separator: &str, mut max_level: usize) -> 
Result<Self, ArrowError> {
+        if max_level == 0 {
+            max_level = usize::MAX;
+        }
+        let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &DataType, 
bool)> = VecDeque::new();
+        for (c, f) in self.columns.iter().zip(self.schema.fields()) {
+            let name_vec: Vec<&str> = vec![f.name()];
+            queue.push_back((0, c, name_vec, f.data_type(), f.is_nullable()));
+        }
+        let mut columns: Vec<ArrayRef> = Vec::new();
+        let mut fields: Vec<FieldRef> = Vec::new();
+
+        while let Some((depth, c, name, data_type, nullable)) = 
queue.pop_front() {
+            if depth < max_level {
+                match data_type {
+                    DataType::Struct(ff) => {
+                        // Need to zip these in reverse to maintain original 
order
+                        for (cff, fff) in 
c.as_struct().columns().iter().zip(ff.into_iter()).rev() {
+                            let mut name = name.clone();
+                            name.push(separator);
+                            name.push(fff.name().as_str());
+                            queue.push_front((
+                                depth + 1,
+                                cff,
+                                name.clone(),

Review Comment:
   ```suggestion
                                   name,
   ```
   
   This was already cloned, no need to clone again



##########
arrow-array/src/record_batch.rs:
##########
@@ -1197,6 +1297,172 @@ mod tests {
         assert_ne!(batch1, batch2);
     }
 
+    #[test]
+    fn normalize_simple() {
+        let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", 
""]));
+        let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), 
Some(4)]));
+        let year: ArrayRef = Arc::new(Int64Array::from(vec![None, 
Some(2022)]));
+
+        let animals_field = Arc::new(Field::new("animals", DataType::Utf8, 
true));
+        let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, 
true));
+        let year_field = Arc::new(Field::new("year", DataType::Int64, true));
+
+        let a = Arc::new(StructArray::from(vec![
+            (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
+            (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
+            (year_field.clone(), Arc::new(year.clone()) as ArrayRef),
+        ]));
+
+        let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)]));
+
+        let schema = Schema::new(vec![
+            Field::new(
+                "a",
+                DataType::Struct(Fields::from(vec![animals_field, 
n_legs_field, year_field])),
+                false,
+            ),
+            Field::new("month", DataType::Int64, true),
+        ]);
+
+        let normalized = RecordBatch::try_new(Arc::new(schema), vec![a, 
month.clone()])
+            .expect("valid conversion")
+            .normalize(".", 0)
+            .expect("valid normalization");
+
+        let expected = RecordBatch::try_from_iter_with_nullable(vec![
+            ("a.animals", animals.clone(), true),
+            ("a.n_legs", n_legs.clone(), true),
+            ("a.year", year.clone(), true),
+            ("month", month.clone(), true),
+        ])
+        .expect("valid conversion");
+
+        assert_eq!(expected, normalized);
+    }
+
+    #[test]
+    fn normalize_nested() {

Review Comment:
   Perhaps have some test cases with some more complex types thrown in as well? 
e.g. have a ListArray with a StructArray within
   
   (Even if to prove that the Struct within the List shouldn't be affected)



##########
arrow-array/src/record_batch.rs:
##########
@@ -394,6 +396,104 @@ impl RecordBatch {
         )
     }
 
+    /// Normalize a semi-structured [`RecordBatch`] into a flat table.
+    ///
+    /// `separator`: Nested [`Field`]s will generate names separated by 
`separator`, e.g. for
+    /// separator= "." and the schema:
+    /// ```text
+    ///     "foo": StructArray<"bar": Utf8>
+    /// ```
+    /// will generate:
+    /// ```text
+    ///     "foo.bar": Utf8
+    /// ```
+    /// `max_level`: The maximum number of levels (depth of the `Schema` and 
`Columns`) to
+    /// normalize. If `0`, normalizes all levels.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, 
RecordBatch};
+    /// # use arrow_schema::{DataType, Field, Fields, Schema};
+    ///
+    /// let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", 
""]));
+    /// let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), 
Some(4)]));
+    ///
+    /// let animals_field = Arc::new(Field::new("animals", DataType::Utf8, 
true));
+    /// let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, 
true));
+    ///
+    /// let a = Arc::new(StructArray::from(vec![
+    ///     (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
+    ///     (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
+    /// ]));
+    ///
+    /// let schema = Schema::new(vec![
+    ///     Field::new(
+    ///         "a",
+    ///         DataType::Struct(Fields::from(vec![animals_field, 
n_legs_field])),
+    ///         false,
+    ///     )
+    /// ]);
+    ///
+    /// let normalized = RecordBatch::try_new(Arc::new(schema), vec![a])
+    ///     .expect("valid conversion")
+    ///     .normalize(".", 0)
+    ///     .expect("valid normalization");
+    ///
+    /// let expected = RecordBatch::try_from_iter_with_nullable(vec![
+    ///     ("a.animals", animals.clone(), true),
+    ///     ("a.n_legs", n_legs.clone(), true),
+    /// ])
+    /// .expect("valid conversion");
+    ///
+    /// assert_eq!(expected, normalized);
+    /// ```
+    pub fn normalize(&self, separator: &str, mut max_level: usize) -> 
Result<Self, ArrowError> {
+        if max_level == 0 {
+            max_level = usize::MAX;
+        }
+        let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &DataType, 
bool)> = VecDeque::new();
+        for (c, f) in self.columns.iter().zip(self.schema.fields()) {
+            let name_vec: Vec<&str> = vec![f.name()];
+            queue.push_back((0, c, name_vec, f.data_type(), f.is_nullable()));
+        }
+        let mut columns: Vec<ArrayRef> = Vec::new();
+        let mut fields: Vec<FieldRef> = Vec::new();
+
+        while let Some((depth, c, name, data_type, nullable)) = 
queue.pop_front() {
+            if depth < max_level {
+                match data_type {
+                    DataType::Struct(ff) => {
+                        // Need to zip these in reverse to maintain original 
order
+                        for (cff, fff) in 
c.as_struct().columns().iter().zip(ff.into_iter()).rev() {
+                            let mut name = name.clone();
+                            name.push(separator);
+                            name.push(fff.name().as_str());

Review Comment:
   ```suggestion
                               name.push(fff.name());
   ```
   
   I think this works too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to