andygrove commented on a change in pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#discussion_r469309756



##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected 
type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),
+        }
+    }};
+}
+
+/// Create a RecordBatch representing the accumulated results in a map
+fn create_batch_from_map(
+    map: &FnvHashMap<Vec<GroupByScalar>, Rc<AccumulatorSet>>,
+    num_group_expr: usize,
+    num_aggr_expr: usize,
+    output_schema: &Schema,
+) -> Result<RecordBatch> {
+    let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![];
+    for i in 0..num_group_expr + num_aggr_expr {
+        let builder: Box<dyn ArrayBuilder> = match 
output_schema.field(i).data_type() {
+            DataType::Int8 => Box::new(Int8Builder::new(map.len())),
+            DataType::Int16 => Box::new(Int16Builder::new(map.len())),
+            DataType::Int32 => Box::new(Int32Builder::new(map.len())),
+            DataType::Int64 => Box::new(Int64Builder::new(map.len())),
+            DataType::UInt8 => Box::new(UInt8Builder::new(map.len())),
+            DataType::UInt16 => Box::new(UInt16Builder::new(map.len())),
+            DataType::UInt32 => Box::new(UInt32Builder::new(map.len())),
+            DataType::UInt64 => Box::new(UInt64Builder::new(map.len())),
+            DataType::Float32 => Box::new(Float32Builder::new(map.len())),
+            DataType::Float64 => Box::new(Float64Builder::new(map.len())),
+            DataType::Utf8 => Box::new(StringBuilder::new(map.len())),
+            _ => {
+                return Err(ExecutionError::ExecutionError(
+                    "Unsupported group data type".to_string(),
+                ))
+            }
+        };
+        builders.push(builder);
+    }
+
+    // iterate over the map
+    for (k, v) in map.iter() {
+        // add group values to builders
+        for i in 0..num_group_expr {
+            let builder = builders[i].as_any_mut();
+            match &k[i] {
+                GroupByScalar::Int8(n) => append_group_value!(builder, 
Int8Builder, *n),
+                GroupByScalar::Int16(n) => append_group_value!(builder, 
Int16Builder, *n),
+                GroupByScalar::Int32(n) => append_group_value!(builder, 
Int32Builder, *n),
+                GroupByScalar::Int64(n) => append_group_value!(builder, 
Int64Builder, *n),
+                GroupByScalar::UInt8(n) => append_group_value!(builder, 
UInt8Builder, *n),
+                GroupByScalar::UInt16(n) => {
+                    append_group_value!(builder, UInt16Builder, *n)
+                }
+                GroupByScalar::UInt32(n) => {
+                    append_group_value!(builder, UInt32Builder, *n)
+                }
+                GroupByScalar::UInt64(n) => {
+                    append_group_value!(builder, UInt64Builder, *n)
+                }
+                GroupByScalar::Utf8(str) => {
+                    append_group_value!(builder, StringBuilder, str)
+                }
+            }
+        }
+
+        // add agggregate values to builders
+        for i in 0..num_aggr_expr {
+            let value = v[i].borrow().get_value()?;
+            let index = num_group_expr + i;
+            let builder = builders[index].as_any_mut();
+            match output_schema.field(i).data_type() {
+                DataType::Int8 => append_aggr_value!(builder, Int8Builder, 
value, Int8),
+                DataType::Int16 => {
+                    append_aggr_value!(builder, Int16Builder, value, Int16)
+                }
+                DataType::Int32 => {
+                    append_aggr_value!(builder, Int32Builder, value, Int32)
+                }
+                DataType::Int64 => {
+                    append_aggr_value!(builder, Int64Builder, value, Int64)
+                }
+                DataType::UInt8 => {
+                    append_aggr_value!(builder, UInt8Builder, value, UInt8)
+                }
+                DataType::UInt16 => {
+                    append_aggr_value!(builder, UInt16Builder, value, UInt16)
+                }
+                DataType::UInt32 => {
+                    append_aggr_value!(builder, UInt32Builder, value, UInt32)
+                }
+                DataType::UInt64 => {
+                    append_aggr_value!(builder, UInt64Builder, value, UInt64)
+                }
+                DataType::Float32 => {
+                    append_aggr_value!(builder, Float32Builder, value, Float32)
+                }
+                DataType::Float64 => {
+                    append_aggr_value!(builder, Float64Builder, value, Float64)
+                }
+                DataType::Utf8 => {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to