jorgecarleitao commented on a change in pull request #8172:
URL: https://github.com/apache/arrow/pull/8172#discussion_r487305150



##########
File path: rust/datafusion/benches/aggregate_query_sql.rs
##########
@@ -39,72 +46,105 @@ fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
     for _batch in results {}
 }
 
-fn create_context() -> ExecutionContext {
-    // define schema for data source (csv file)
+fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
+    // use random numbers to avoid spurious compiler optimizations wrt to 
branching
+    let mut rng = rand::thread_rng();
+
+    (0..size)
+        .map(|_| {
+            if rng.gen::<f64>() > null_density {
+                None
+            } else {
+                Some(rng.gen::<f64>())
+            }
+        })
+        .collect()
+}
+
+fn create_context(
+    partitions_len: usize,
+    array_len: usize,
+    batch_size: usize,
+) -> Result<ExecutionContext> {
+    // define a schema.
     let schema = Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Utf8, false),
-        Field::new("c2", DataType::UInt32, false),
-        Field::new("c3", DataType::Int8, false),
-        Field::new("c4", DataType::Int16, false),
-        Field::new("c5", DataType::Int32, false),
-        Field::new("c6", DataType::Int64, false),
-        Field::new("c7", DataType::UInt8, false),
-        Field::new("c8", DataType::UInt16, false),
-        Field::new("c9", DataType::UInt32, false),
-        Field::new("c10", DataType::UInt64, false),
-        Field::new("c11", DataType::Float32, false),
-        Field::new("c12", DataType::Float64, false),
-        Field::new("c13", DataType::Utf8, false),
+        Field::new("utf8", DataType::Utf8, false),
+        Field::new("f32", DataType::Float32, false),
+        Field::new("f64", DataType::Float64, false),
     ]));
 
-    let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not 
defined");
+    // define data.
+    let partitions = (0..partitions_len)
+        .map(|_| {
+            (0..array_len / batch_size / partitions_len)
+                .map(|i| {
+                    let keys: Vec<String> = (0..batch_size)
+                        .map(
+                            // the 4 here is the number of different keys.
+                            // a higher number increase sparseness
+                            |i| format!("hi{}", i % 4),

Review comment:
       This should be random, i%4 is quite predictable. 

##########
File path: rust/datafusion/benches/aggregate_query_sql.rs
##########
@@ -39,72 +46,105 @@ fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
     for _batch in results {}
 }
 
-fn create_context() -> ExecutionContext {
-    // define schema for data source (csv file)
+fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
+    // use random numbers to avoid spurious compiler optimizations wrt to 
branching
+    let mut rng = rand::thread_rng();
+
+    (0..size)
+        .map(|_| {
+            if rng.gen::<f64>() > null_density {
+                None
+            } else {
+                Some(rng.gen::<f64>())
+            }
+        })
+        .collect()
+}
+
+fn create_context(
+    partitions_len: usize,
+    array_len: usize,
+    batch_size: usize,
+) -> Result<ExecutionContext> {
+    // define a schema.
     let schema = Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Utf8, false),
-        Field::new("c2", DataType::UInt32, false),
-        Field::new("c3", DataType::Int8, false),
-        Field::new("c4", DataType::Int16, false),
-        Field::new("c5", DataType::Int32, false),
-        Field::new("c6", DataType::Int64, false),
-        Field::new("c7", DataType::UInt8, false),
-        Field::new("c8", DataType::UInt16, false),
-        Field::new("c9", DataType::UInt32, false),
-        Field::new("c10", DataType::UInt64, false),
-        Field::new("c11", DataType::Float32, false),
-        Field::new("c12", DataType::Float64, false),
-        Field::new("c13", DataType::Utf8, false),
+        Field::new("utf8", DataType::Utf8, false),
+        Field::new("f32", DataType::Float32, false),
+        Field::new("f64", DataType::Float64, false),
     ]));
 
-    let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not 
defined");
+    // define data.
+    let partitions = (0..partitions_len)
+        .map(|_| {
+            (0..array_len / batch_size / partitions_len)
+                .map(|i| {
+                    let keys: Vec<String> = (0..batch_size)
+                        .map(
+                            // the 4 here is the number of different keys.
+                            // a higher number increase sparseness
+                            |i| format!("hi{}", i % 4),
+                        )
+                        .collect();
+                    let keys: Vec<&str> = keys.iter().map(|e| &**e).collect();
+
+                    let values = create_data(batch_size, 0.5);
+
+                    RecordBatch::try_new(
+                        schema.clone(),
+                        vec![
+                            Arc::new(StringArray::from(keys)),
+                            Arc::new(Float32Array::from(vec![i as f32; 
batch_size])),
+                            Arc::new(Float64Array::from(values)),
+                        ],
+                    )
+                    .unwrap()
+                })
+                .collect::<Vec<_>>()
+        })
+        .collect::<Vec<_>>();
 
-    // create CSV data source
-    let csv = CsvFile::try_new(
-        &format!("{}/csv/aggregate_test_100.csv", testdata),
-        CsvReadOptions::new().schema(&schema),
-    )
-    .unwrap();
+    let mut ctx = ExecutionContext::new();
 
-    let mem_table = MemTable::load(&csv).unwrap();
+    // declare a table in memory. In spark API, this corresponds to 
createDataFrame(...).
+    let provider = MemTable::new(schema, partitions)?;
+    ctx.register_table("t", Box::new(provider));
 
-    // create local execution context
-    let mut ctx = ExecutionContext::new();
-    ctx.register_table("aggregate_test_100", Box::new(mem_table));
-    ctx
+    Ok(ctx)
 }
 
 fn criterion_benchmark(c: &mut Criterion) {
-    c.bench_function("aggregate_query_no_group_by", |b| {
-        let mut ctx = create_context();
+    let partitions_len = 4;
+    let array_len = 32768; // 2^15
+    let batch_size = 2048; // 2^11
+    let mut ctx = create_context(partitions_len, array_len, 
batch_size).unwrap();
+
+    c.bench_function("aggregate_query_no_group_by 15 12", |b| {

Review comment:
       11, not 12.

##########
File path: rust/datafusion/src/physical_plan/common.rs
##########
@@ -205,3 +213,91 @@ pub fn get_scalar_value(array: &ArrayRef, row: usize) -> 
Result<Option<ScalarVal
     };
     Ok(value)
 }
+
+/// Converts a scalar value into an array.
+/// This is useful for aggregations.
+pub fn to_array(value: &ScalarValue) -> Result<ArrayRef> {
+    match value {
+        ScalarValue::Boolean(e) => Ok(Arc::new(BooleanArray::from(vec![*e])) 
as ArrayRef),
+        ScalarValue::Float64(e) => Ok(Arc::new(Float64Array::from(vec![*e])) 
as ArrayRef),
+        ScalarValue::Float32(e) => Ok(Arc::new(Float32Array::from(vec![*e]))),
+        ScalarValue::Int8(e) => Ok(Arc::new(Int8Array::from(vec![*e]))),
+        ScalarValue::Int16(e) => Ok(Arc::new(Int16Array::from(vec![*e]))),
+        ScalarValue::Int32(e) => Ok(Arc::new(Int32Array::from(vec![*e]))),
+        ScalarValue::Int64(e) => Ok(Arc::new(Int64Array::from(vec![*e]))),
+        ScalarValue::UInt8(e) => Ok(Arc::new(UInt8Array::from(vec![*e]))),
+        ScalarValue::UInt16(e) => Ok(Arc::new(UInt16Array::from(vec![*e]))),
+        ScalarValue::UInt32(e) => Ok(Arc::new(UInt32Array::from(vec![*e]))),
+        ScalarValue::UInt64(e) => Ok(Arc::new(UInt64Array::from(vec![*e]))),
+        ScalarValue::Utf8(e) => {
+            // awful code...
+            let v = e.as_ref().unwrap_or(&"".to_string()).clone();
+            let v = e.as_ref().and_then(|_| Some(&*v));
+            Ok(Arc::new(StringArray::from(vec![v])))
+        }
+        ScalarValue::LargeUtf8(e) => {
+            // awful code...
+            let v = e.as_ref().unwrap_or(&"".to_string()).clone();
+            let v = e.as_ref().and_then(|_| Some(&*v));
+            Ok(Arc::new(LargeStringArray::from(vec![v])))
+        }
+        ScalarValue::Null => Err(ExecutionError::InternalError(format!(
+            "Cannot convert scalar {:?} to array",
+            value
+        ))),
+        ScalarValue::Struct(_) => Err(ExecutionError::InternalError(format!(
+            "Cannot convert scalar {:?} to array",
+            value
+        ))),
+    }
+}
+
+/// creates an empty record batch.
+pub fn create_batch_empty(schema: &Schema) -> Result<Vec<ArrayRef>> {
+    schema
+        .fields()
+        .iter()
+        .map(|f| match f.data_type() {
+            DataType::Float32 => {
+                Ok(Arc::new(Float32Array::from(vec![] as Vec<f32>)) as 
ArrayRef)
+            }
+            DataType::Float64 => {
+                Ok(Arc::new(Float64Array::from(vec![] as Vec<f64>)) as 
ArrayRef)
+            }
+            DataType::Int64 => {
+                Ok(Arc::new(Int64Array::from(vec![] as Vec<i64>)) as ArrayRef)
+            }
+            DataType::Int32 => {
+                Ok(Arc::new(Int32Array::from(vec![] as Vec<i32>)) as ArrayRef)
+            }
+            DataType::Int16 => {
+                Ok(Arc::new(Int16Array::from(vec![] as Vec<i16>)) as ArrayRef)
+            }
+            DataType::Int8 => {
+                Ok(Arc::new(Int8Array::from(vec![] as Vec<i8>)) as ArrayRef)
+            }
+            DataType::UInt64 => {
+                Ok(Arc::new(UInt64Array::from(vec![] as Vec<u64>)) as ArrayRef)
+            }
+            DataType::UInt32 => {
+                Ok(Arc::new(UInt32Array::from(vec![] as Vec<u32>)) as ArrayRef)
+            }
+            DataType::UInt16 => {
+                Ok(Arc::new(UInt16Array::from(vec![] as Vec<u16>)) as ArrayRef)
+            }
+            DataType::UInt8 => {
+                Ok(Arc::new(UInt8Array::from(vec![] as Vec<u8>)) as ArrayRef)
+            }
+            DataType::Utf8 => {
+                Ok(Arc::new(StringArray::from(vec![] as Vec<&str>)) as 
ArrayRef)
+            }
+            DataType::Boolean => {
+                Ok(Arc::new(BooleanArray::from(vec![] as Vec<bool>)) as 
ArrayRef)
+            }
+            _ => Err(ExecutionError::NotImplemented(format!(

Review comment:
       So far this was enough because we do not have aggregations with other 
types, but this is needed because some batches can have no entries, in which 
case we need to build an empty record batch.

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -97,766 +104,712 @@ pub fn col(name: &str) -> Arc<dyn PhysicalExpr> {
 /// SUM aggregate expression
 #[derive(Debug)]
 pub struct Sum {
+    name: String,

Review comment:
       `AggregateExpr` has this information with them now because it allows 
them to `create_accumulator` without access to the input schema. This is 
helpful because, on the second pass, we need to create accumulators on the fly, 
and the input_schema of the second pass is different, as it now corresponds to 
the schema 
   
   `[group1, group2, agg1_state1, agg1_state2, ...]`

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -97,766 +104,712 @@ pub fn col(name: &str) -> Arc<dyn PhysicalExpr> {
 /// SUM aggregate expression
 #[derive(Debug)]
 pub struct Sum {
+    name: String,
+    data_type: DataType,
     expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
 }
 
-impl Sum {
-    /// Create a new SUM aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
-        Self { expr }
+/// function return type of a sum
+pub fn sum_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 
=> {
+            Ok(DataType::Int64)
+        }
+        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
+            Ok(DataType::UInt64)
+        }
+        DataType::Float32 => Ok(DataType::Float32),
+        DataType::Float64 => Ok(DataType::Float64),
+        other => Err(ExecutionError::General(format!(
+            "SUM does not support type \"{:?}\"",
+            other
+        ))),
     }
 }
 
-impl AggregateExpr for Sum {
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {
-            DataType::Int8 | DataType::Int16 | DataType::Int32 | 
DataType::Int64 => {
-                Ok(DataType::Int64)
-            }
-            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
-                Ok(DataType::UInt64)
-            }
-            DataType::Float32 => Ok(DataType::Float32),
-            DataType::Float64 => Ok(DataType::Float64),
-            other => Err(ExecutionError::General(format!(
-                "SUM does not support {:?}",
-                other
-            ))),
+impl Sum {
+    /// Create a new SUM aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) 
-> Self {
+        Self {
+            name,
+            expr,
+            data_type,
+            nullable: true,
         }
     }
+}
 
-    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        // null should be returned if no rows are aggregated
-        Ok(true)
+impl AggregateExpr for Sum {
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
     }
 
-    fn evaluate_input(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        self.expr.evaluate(batch)
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "sum"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
     }
 
-    fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>> {
-        Rc::new(RefCell::new(SumAccumulator { sum: None }))
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
     }
 
-    fn create_reducer(&self, column_name: &str) -> Arc<dyn AggregateExpr> {
-        Arc::new(Sum::new(Arc::new(Column::new(column_name))))
+    fn create_accumulator(&self) -> Result<Rc<RefCell<dyn Accumulator>>> {
+        Ok(Rc::new(RefCell::new(SumAccumulator::try_new(
+            &self.data_type,
+        )?)))
     }
 }
 
-macro_rules! sum_accumulate {

Review comment:
       This was operating on a row-by-row basis, which was replaced by a batch 
operation using `compute::sum`.

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -97,766 +104,712 @@ pub fn col(name: &str) -> Arc<dyn PhysicalExpr> {
 /// SUM aggregate expression
 #[derive(Debug)]
 pub struct Sum {
+    name: String,
+    data_type: DataType,
     expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
 }
 
-impl Sum {
-    /// Create a new SUM aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
-        Self { expr }
+/// function return type of a sum
+pub fn sum_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 
=> {
+            Ok(DataType::Int64)
+        }
+        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
+            Ok(DataType::UInt64)
+        }
+        DataType::Float32 => Ok(DataType::Float32),
+        DataType::Float64 => Ok(DataType::Float64),
+        other => Err(ExecutionError::General(format!(
+            "SUM does not support type \"{:?}\"",
+            other
+        ))),
     }
 }
 
-impl AggregateExpr for Sum {
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {
-            DataType::Int8 | DataType::Int16 | DataType::Int32 | 
DataType::Int64 => {
-                Ok(DataType::Int64)
-            }
-            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
-                Ok(DataType::UInt64)
-            }
-            DataType::Float32 => Ok(DataType::Float32),
-            DataType::Float64 => Ok(DataType::Float64),
-            other => Err(ExecutionError::General(format!(
-                "SUM does not support {:?}",
-                other
-            ))),
+impl Sum {
+    /// Create a new SUM aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) 
-> Self {
+        Self {
+            name,
+            expr,
+            data_type,
+            nullable: true,
         }
     }
+}
 
-    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        // null should be returned if no rows are aggregated
-        Ok(true)
+impl AggregateExpr for Sum {
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
     }
 
-    fn evaluate_input(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        self.expr.evaluate(batch)
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "sum"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
     }
 
-    fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>> {
-        Rc::new(RefCell::new(SumAccumulator { sum: None }))
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
     }
 
-    fn create_reducer(&self, column_name: &str) -> Arc<dyn AggregateExpr> {
-        Arc::new(Sum::new(Arc::new(Column::new(column_name))))
+    fn create_accumulator(&self) -> Result<Rc<RefCell<dyn Accumulator>>> {
+        Ok(Rc::new(RefCell::new(SumAccumulator::try_new(
+            &self.data_type,
+        )?)))
     }
 }
 
-macro_rules! sum_accumulate {
-    ($SELF:ident, $VALUE:expr, $ARRAY_TYPE:ident, $SCALAR_VARIANT:ident, 
$TY:ty) => {{
-        $SELF.sum = match $SELF.sum {
-            Some(ScalarValue::$SCALAR_VARIANT(n)) => {
-                Some(ScalarValue::$SCALAR_VARIANT(n + $VALUE as $TY))
-            }
-            Some(_) => {
-                return Err(ExecutionError::InternalError(
-                    "Unexpected ScalarValue variant".to_string(),
-                ))
-            }
-            None => Some(ScalarValue::$SCALAR_VARIANT($VALUE as $TY)),
-        };
-    }};
-}
-
 #[derive(Debug)]
 struct SumAccumulator {
-    sum: Option<ScalarValue>,
+    sum: ScalarValue,
 }
 
-impl Accumulator for SumAccumulator {
-    fn accumulate_scalar(&mut self, value: Option<ScalarValue>) -> Result<()> {
-        if let Some(value) = value {
-            match value {
-                ScalarValue::Int8(value) => {
-                    sum_accumulate!(self, value, Int8Array, Int64, i64);
-                }
-                ScalarValue::Int16(value) => {
-                    sum_accumulate!(self, value, Int16Array, Int64, i64);
-                }
-                ScalarValue::Int32(value) => {
-                    sum_accumulate!(self, value, Int32Array, Int64, i64);
-                }
-                ScalarValue::Int64(value) => {
-                    sum_accumulate!(self, value, Int64Array, Int64, i64);
-                }
-                ScalarValue::UInt8(value) => {
-                    sum_accumulate!(self, value, UInt8Array, UInt64, u64);
-                }
-                ScalarValue::UInt16(value) => {
-                    sum_accumulate!(self, value, UInt16Array, UInt64, u64);
-                }
-                ScalarValue::UInt32(value) => {
-                    sum_accumulate!(self, value, UInt32Array, UInt64, u64);
-                }
-                ScalarValue::UInt64(value) => {
-                    sum_accumulate!(self, value, UInt64Array, UInt64, u64);
-                }
-                ScalarValue::Float32(value) => {
-                    sum_accumulate!(self, value, Float32Array, Float32, f32);
-                }
-                ScalarValue::Float64(value) => {
-                    sum_accumulate!(self, value, Float64Array, Float64, f64);
-                }
-                other => {
-                    return Err(ExecutionError::General(format!(
-                        "SUM does not support {:?}",
-                        other
-                    )))
-                }
-            }
-        }
-        Ok(())
+impl SumAccumulator {
+    /// new sum accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            sum: ScalarValue::try_from(data_type)?,
+        })
     }
+}
 
-    fn accumulate_batch(&mut self, array: &ArrayRef) -> Result<()> {
-        let sum = match array.data_type() {
-            DataType::UInt8 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt8Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::UInt8(n))),
-                    None => Ok(None),
-                }
+// returns the new value after sum with the new values, taking nullability 
into account
+macro_rules! typed_sum_accumulate {
+    ($OLD_VALUE:expr, $NEW_VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, 
$TYPE:ident) => {{
+        let array = $NEW_VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+        let delta = compute::sum(array);
+        if $OLD_VALUE.is_none() {
+            ScalarValue::$SCALAR(delta.and_then(|e| Some(e as $TYPE)))
+        } else {
+            let delta = delta.and_then(|e| Some(e as $TYPE)).unwrap_or(0 as 
$TYPE);
+            ScalarValue::from($OLD_VALUE.unwrap() + delta)
+        }
+    }};
+}
+
+// given an existing value `old` and an `array` of new values,
+// performs a sum, returning the new value.
+fn sum_accumulate(old: &ScalarValue, array: &ArrayRef) -> Result<ScalarValue> {
+    Ok(match old {
+        ScalarValue::Float64(sum) => match array.data_type() {
+            DataType::Float64 => {
+                typed_sum_accumulate!(sum, array, Float64Array, Float64, f64)
             }
-            DataType::UInt16 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt16Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::UInt16(n))),
-                    None => Ok(None),
-                }
+            DataType::Float32 => {
+                typed_sum_accumulate!(sum, array, Float32Array, Float64, f64)
             }
-            DataType::UInt32 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt32Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::UInt32(n))),
-                    None => Ok(None),
-                }
+            DataType::Int64 => {
+                typed_sum_accumulate!(sum, array, Int64Array, Float64, f64)
+            }
+            DataType::Int32 => {
+                typed_sum_accumulate!(sum, array, Int32Array, Float64, f64)
+            }
+            DataType::Int16 => {
+                typed_sum_accumulate!(sum, array, Int16Array, Float64, f64)
             }
+            DataType::Int8 => typed_sum_accumulate!(sum, array, Int8Array, 
Float64, f64),
             DataType::UInt64 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt64Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::UInt64(n))),
-                    None => Ok(None),
-                }
+                typed_sum_accumulate!(sum, array, UInt64Array, Float64, f64)
             }
-            DataType::Int8 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int8Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int8(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt32 => {
+                typed_sum_accumulate!(sum, array, UInt32Array, Float64, f64)
             }
-            DataType::Int16 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int16Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int16(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt16 => {
+                typed_sum_accumulate!(sum, array, UInt16Array, Float64, f64)
             }
-            DataType::Int32 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int32Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int32(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt8 => {
+                typed_sum_accumulate!(sum, array, UInt8Array, Float64, f64)
             }
-            DataType::Int64 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int64Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int64(n))),
-                    None => Ok(None),
-                }
+            dt => {
+                return Err(ExecutionError::InternalError(format!(
+                    "Sum f64 does not expect to receive type {:?}",
+                    dt
+                )))
             }
-            DataType::Float32 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Float32Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::Float32(n))),
-                    None => Ok(None),
-                }
+        },
+        ScalarValue::Float32(sum) => {
+            typed_sum_accumulate!(sum, array, Float32Array, Float32, f32)
+        }
+        ScalarValue::UInt64(sum) => match array.data_type() {
+            DataType::UInt64 => {
+                typed_sum_accumulate!(sum, array, UInt64Array, UInt64, u64)
             }
-            DataType::Float64 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Float64Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::Float64(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt32 => {
+                typed_sum_accumulate!(sum, array, UInt32Array, UInt64, u64)
             }
-            _ => Err(ExecutionError::ExecutionError(
-                "Unsupported data type for SUM".to_string(),
-            )),
-        }?;
-        self.accumulate_scalar(sum)
+            DataType::UInt16 => {
+                typed_sum_accumulate!(sum, array, UInt16Array, UInt64, u64)
+            }
+            DataType::UInt8 => typed_sum_accumulate!(sum, array, UInt8Array, 
UInt64, u64),
+            dt => {
+                return Err(ExecutionError::InternalError(format!(
+                    "Sum is not expected to receive type {:?}",
+                    dt
+                )))
+            }
+        },
+        ScalarValue::Int64(sum) => match array.data_type() {
+            DataType::Int64 => typed_sum_accumulate!(sum, array, Int64Array, 
Int64, i64),
+            DataType::Int32 => typed_sum_accumulate!(sum, array, Int32Array, 
Int64, i64),
+            DataType::Int16 => typed_sum_accumulate!(sum, array, Int16Array, 
Int64, i64),
+            DataType::Int8 => typed_sum_accumulate!(sum, array, Int8Array, 
Int64, i64),
+            dt => {
+                return Err(ExecutionError::InternalError(format!(
+                    "Sum is not expected to receive type {:?}",
+                    dt
+                )))
+            }
+        },
+        e => {
+            return Err(ExecutionError::InternalError(format!(
+                "Sum is not expected to receive a scalar {:?}",
+                e
+            )))
+        }
+    })
+}
+
+impl Accumulator for SumAccumulator {
+    fn update(&mut self, values: &Vec<ArrayRef>) -> Result<()> {
+        // sum(v1, v2, v3) = v1 + v2 + v3
+        self.sum = sum_accumulate(&self.sum, &values[0])?;
+        Ok(())
     }
 
-    fn get_value(&self) -> Result<Option<ScalarValue>> {
-        Ok(self.sum.clone())
+    fn merge(&mut self, states: &Vec<ArrayRef>) -> Result<()> {
+        let state = &states[0];
+        // sum(sum1, sum2, sum3) = sum1 + sum2 + sum3
+        self.sum = sum_accumulate(&self.sum, state)?;
+        Ok(())
     }
-}
 
-/// Create a sum expression
-pub fn sum(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn AggregateExpr> {
-    Arc::new(Sum::new(expr))
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.sum.clone()])
+    }
+
+    fn value(&self) -> Result<ScalarValue> {
+        Ok(self.sum.clone())
+    }
 }
 
 /// AVG aggregate expression
 #[derive(Debug)]
 pub struct Avg {
+    name: String,
+    data_type: DataType,
+    nullable: bool,
     expr: Arc<dyn PhysicalExpr>,
 }
 
-impl Avg {
-    /// Create a new AVG aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
-        Self { expr }
+/// function return type of an average
+pub fn avg_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(ExecutionError::General(format!(
+            "AVG does not support {:?}",
+            other
+        ))),
     }
 }
 
-impl AggregateExpr for Avg {
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {
-            DataType::Int8
-            | DataType::Int16
-            | DataType::Int32
-            | DataType::Int64
-            | DataType::UInt8
-            | DataType::UInt16
-            | DataType::UInt32
-            | DataType::UInt64
-            | DataType::Float32
-            | DataType::Float64 => Ok(DataType::Float64),
-            other => Err(ExecutionError::General(format!(
-                "AVG does not support {:?}",
-                other
-            ))),
+impl Avg {
+    /// Create a new AVG aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) 
-> Self {
+        Self {
+            name,
+            expr,
+            data_type,
+            nullable: true,
         }
     }
+}
 
-    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        // null should be returned if no rows are aggregated
-        Ok(true)
+impl AggregateExpr for Avg {
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
     }
 
-    fn evaluate_input(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        self.expr.evaluate(batch)
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "sum"),
+                DataType::Float64,
+                true,
+            ),
+        ])
     }
 
-    fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>> {
-        Rc::new(RefCell::new(AvgAccumulator {
-            sum: None,
-            count: None,
-        }))
+    fn create_accumulator(&self) -> Result<Rc<RefCell<dyn Accumulator>>> {
+        Ok(Rc::new(RefCell::new(AvgAccumulator::try_new(
+            // avg is f64
+            &DataType::Float64,
+        )?)))
     }
 
-    fn create_reducer(&self, column_name: &str) -> Arc<dyn AggregateExpr> {
-        Arc::new(Avg::new(Arc::new(Column::new(column_name))))
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
     }
 }
 
-macro_rules! avg_accumulate {
-    ($SELF:ident, $VALUE:expr, $ARRAY_TYPE:ident) => {{
-        match ($SELF.sum, $SELF.count) {
-            (Some(sum), Some(count)) => {
-                $SELF.sum = Some(sum + $VALUE as f64);
-                $SELF.count = Some(count + 1);
-            }
-            _ => {
-                $SELF.sum = Some($VALUE as f64);
-                $SELF.count = Some(1);
-            }
-        };
-    }};
-}
 #[derive(Debug)]
 struct AvgAccumulator {
-    sum: Option<f64>,
-    count: Option<i64>,
+    // sum is used for null
+    sum: ScalarValue,
+    count: u64,
+}
+
+impl AvgAccumulator {
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
+        Ok(Self {
+            sum: ScalarValue::try_from(datatype)?,
+            count: 0,
+        })
+    }
 }
 
 impl Accumulator for AvgAccumulator {
-    fn accumulate_scalar(&mut self, value: Option<ScalarValue>) -> Result<()> {
-        if let Some(value) = value {
-            match value {
-                ScalarValue::Int8(value) => avg_accumulate!(self, value, 
Int8Array),
-                ScalarValue::Int16(value) => avg_accumulate!(self, value, 
Int16Array),
-                ScalarValue::Int32(value) => avg_accumulate!(self, value, 
Int32Array),
-                ScalarValue::Int64(value) => avg_accumulate!(self, value, 
Int64Array),
-                ScalarValue::UInt8(value) => avg_accumulate!(self, value, 
UInt8Array),
-                ScalarValue::UInt16(value) => avg_accumulate!(self, value, 
UInt16Array),
-                ScalarValue::UInt32(value) => avg_accumulate!(self, value, 
UInt32Array),
-                ScalarValue::UInt64(value) => avg_accumulate!(self, value, 
UInt64Array),
-                ScalarValue::Float32(value) => avg_accumulate!(self, value, 
Float32Array),
-                ScalarValue::Float64(value) => avg_accumulate!(self, value, 
Float64Array),
-                other => {
-                    return Err(ExecutionError::General(format!(
-                        "AVG does not support {:?}",
-                        other
-                    )))
-                }
-            }
-        }
+    fn update(&mut self, values: &Vec<ArrayRef>) -> Result<()> {
+        let values = &values[0];
+
+        self.count += (values.len() - values.data().null_count()) as u64;
+        self.sum = sum_accumulate(&self.sum, values)?;
         Ok(())
     }
 
-    fn accumulate_batch(&mut self, array: &ArrayRef) -> Result<()> {
-        for row in 0..array.len() {
-            self.accumulate_scalar(get_scalar_value(array, row)?)?;
-        }
+    fn merge(&mut self, states: &Vec<ArrayRef>) -> Result<()> {

Review comment:
       This is the prime example of this PR: the merge here uses two states to 
change two states from the accumulator.

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -1835,88 +1797,18 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn sum_contract() -> Result<()> {

Review comment:
       I removed these because the types are now selected from the signature, 
and no longer inferred by `AggregateExpr`.

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -848,6 +848,24 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn aggregate_grouped_empty() -> Result<()> {

Review comment:
       A test for grouped with an empty result

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -1147,6 +1165,41 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn simple_avg() -> Result<()> {
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+
+        let batch1 = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
+        )?;
+        let batch2 = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(Int32Array::from(vec![4, 5]))],
+        )?;
+
+        let mut ctx = ExecutionContext::new();
+
+        let provider = MemTable::new(Arc::new(schema), vec![vec![batch1], 
vec![batch2]])?;
+        ctx.register_table("t", Box::new(provider));
+
+        let result = collect(&mut ctx, "SELECT AVG(a) FROM t")?;
+
+        let batch = &result[0];
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(1, batch.num_rows());
+
+        let values = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .expect("failed to cast version");
+        assert_eq!(values.len(), 1);
+        // avg(1,2,3,4,5) = 3.0
+        assert_eq!(values.value(0), 3.0_f64);

Review comment:
       This test fails in master, with 3.25 != 3.0

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -276,117 +272,91 @@ impl RecordBatchReader for GroupedHashAggregateIterator {
                 })
                 .collect::<ArrowResult<Vec<_>>>()?;
 
-            // evaluate the inputs to the aggregate expressions for this batch
-            let aggr_input_values = self
-                .aggr_expr
-                .iter()
-                .map(|expr| {
-                    expr.evaluate_input(&batch)
-                        .map_err(ExecutionError::into_arrow_external_error)
-                })
-                .collect::<ArrowResult<Vec<_>>>()?;
+            // evaluate the aggregation expressions. We could evaluate them 
after the `take`, but since
+            // we need to evaluate all of them anyways, it is more performant 
to do it while they are together.
+            let aggr_input_values = evaluate(&expressions, &batch)
+                .map_err(ExecutionError::into_arrow_external_error)?;
 
             // create vector large enough to hold the grouping key
+            // this is an optimization to avoid allocating `key` on every row.
+            // it will be overwritten on the loop below
             let mut key = Vec::with_capacity(group_values.len());
             for _ in 0..group_values.len() {
                 key.push(GroupByScalar::UInt32(0));
             }
 
-            // iterate over each row in the batch and create the accumulators 
for each grouping key
-            let mut accums: Vec<Rc<AccumulatorSet>> =
-                Vec::with_capacity(batch.num_rows());
-
+            // 1.1 construct the key from the group values
+            // 1.2 construct/update the mapping key -> indexes (on the batch) 
used to `take` values from the batch in a single operation

Review comment:
       wrong comment, 1.2 and 1.3 are together.

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -709,64 +659,111 @@ fn create_key(
 #[cfg(test)]
 mod tests {
 
+    use arrow::array::Float64Array;
+
     use super::*;
-    use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
-    use crate::physical_plan::expressions::{col, sum};
+    use crate::physical_plan::expressions::{col, Avg};
     use crate::physical_plan::merge::MergeExec;
-    use crate::test;
+    use crate::physical_plan::{common, memory::MemoryExec};
+
+    fn some_data() -> ArrowResult<(Arc<Schema>, Vec<RecordBatch>)> {

Review comment:
       I placed controlled data here so that we cab easily check the result

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -218,32 +217,18 @@ impl DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
                 let aggregates = aggr_expr
                     .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_aggregate_expr(e, &input_schema, 
ctx_state),
-                            e.name(&input_schema),
-                        ))
-                    })
+                    .map(|e| self.create_aggregate_expr(e, &input_schema, 
ctx_state))
                     .collect::<Result<Vec<_>>>()?;
 
-                let initial_aggr = HashAggregateExec::try_new(
+                let initial_aggr = Arc::new(HashAggregateExec::try_new(
                     AggregateMode::Partial,
                     groups.clone(),
                     aggregates.clone(),
                     input,
-                )?;
+                )?);
 
-                if initial_aggr.output_partitioning().partition_count() == 1 {

Review comment:
       This seems major for a single partition.

##########
File path: rust/datafusion/src/scalar.rs
##########
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides ScalarValue, an enum that can be used for storage of 
single elements
+
+use std::{convert::TryFrom, fmt};
+
+use arrow::datatypes::DataType;
+
+use crate::error::{ExecutionError, Result};
+
+/// ScalarValue enumeration
+#[derive(Clone, PartialEq)]
+pub enum ScalarValue {
+    /// null value
+    Null,

Review comment:
       no longer needed?

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -218,32 +217,18 @@ impl DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
                 let aggregates = aggr_expr
                     .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_aggregate_expr(e, &input_schema, 
ctx_state),
-                            e.name(&input_schema),
-                        ))
-                    })
+                    .map(|e| self.create_aggregate_expr(e, &input_schema, 
ctx_state))
                     .collect::<Result<Vec<_>>>()?;
 
-                let initial_aggr = HashAggregateExec::try_new(
+                let initial_aggr = Arc::new(HashAggregateExec::try_new(
                     AggregateMode::Partial,
                     groups.clone(),
                     aggregates.clone(),
                     input,
-                )?;
+                )?);
 
-                if initial_aggr.output_partitioning().partition_count() == 1 {

Review comment:
       Sorry. What I meant is that compared to the previous implementation, we 
double the amount of work, even when we did not need because there is a single 
partition.
   
   IMO we should handle this in another way, e.g. via a physical optimizer that 
re-writes the partial aggregation by a final aggregation when the input is a 
single partition.

##########
File path: rust/arrow/src/compute/kernels/aggregate.rs
##########
@@ -19,9 +19,42 @@
 
 use std::ops::Add;
 
-use crate::array::{Array, PrimitiveArray};
+use crate::array::{Array, LargeStringArray, PrimitiveArray, StringArray};
 use crate::datatypes::ArrowNumericType;
 
+/// Helper macro to perform min/max of strings
+macro_rules! min_max_string_helper {
+    ($array:expr, $cmp:tt) => {{
+        let null_count = $array.null_count();
+
+        if null_count == $array.len() {
+            return None
+        }
+        let mut n = "";
+        let mut has_value = false;
+        let data = $array.data();
+
+        if null_count == 0 {

Review comment:
       yes. Generally, operations on non-null fields are faster because there 
isn't an unpredictable branch on the loop. This check removes that 
unpredictable branch altogether when there are no nulls.
   
   We could probably still get some more juice by using some of the vertical 
operations supported by packed_simd.

##########
File path: rust/datafusion/src/physical_plan/aggregates.rs
##########
@@ -103,42 +103,54 @@ pub fn create_aggregate_expr(
     fun: &AggregateFunction,
     args: &Vec<Arc<dyn PhysicalExpr>>,
     input_schema: &Schema,
+    name: String,
 ) -> Result<Arc<dyn AggregateExpr>> {
     // coerce
     let arg = coerce(args, input_schema, &signature(fun))?[0].clone();
 
+    let arg_types = args
+        .iter()
+        .map(|e| e.data_type(input_schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    let return_type = return_type(&fun, &arg_types)?;
+
     Ok(match fun {
-        AggregateFunction::Count => expressions::count(arg),
-        AggregateFunction::Sum => expressions::sum(arg),
-        AggregateFunction::Min => expressions::min(arg),
-        AggregateFunction::Max => expressions::max(arg),
-        AggregateFunction::Avg => expressions::avg(arg),
+        AggregateFunction::Count => {
+            Arc::new(expressions::Count::new(arg, name, return_type))
+        }
+        AggregateFunction::Sum => Arc::new(expressions::Sum::new(arg, name, 
return_type)),
+        AggregateFunction::Min => Arc::new(expressions::Min::new(arg, name, 
return_type)),
+        AggregateFunction::Max => Arc::new(expressions::Max::new(arg, name, 
return_type)),
+        AggregateFunction::Avg => Arc::new(expressions::Avg::new(arg, name, 
return_type)),
     })
 }
 
+static NUMERICS: &'static [DataType] = &[
+    DataType::Int8,
+    DataType::Int16,
+    DataType::Int32,
+    DataType::Int64,
+    DataType::UInt8,
+    DataType::UInt16,
+    DataType::UInt32,
+    DataType::UInt64,
+    DataType::Float32,
+    DataType::Float64,
+];
+
 /// the signatures supported by the function `fun`.
 fn signature(fun: &AggregateFunction) -> Signature {
     // note: the physical expression must accept the type returned by this 
function or the execution panics.
-
     match fun {
         AggregateFunction::Count => Signature::Any(1),
-        AggregateFunction::Min
-        | AggregateFunction::Max
-        | AggregateFunction::Avg
-        | AggregateFunction::Sum => Signature::Uniform(
-            1,
-            vec![
-                DataType::Int8,
-                DataType::Int16,
-                DataType::Int32,
-                DataType::Int64,
-                DataType::UInt8,
-                DataType::UInt16,
-                DataType::UInt32,
-                DataType::UInt64,
-                DataType::Float32,
-                DataType::Float64,
-            ],
-        ),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            let mut valid = vec![DataType::Utf8, DataType::LargeUtf8];
+            valid.extend_from_slice(NUMERICS);

Review comment:
       I have not though about that, but that is an interesting idea 👍 
   
   In this PR, `max` continues to only support a single column, which we select 
in [this 
line](https://github.com/apache/arrow/pull/8172/files#diff-a98d5d588d3c5b525c6840271a5bdddcR571).
   
   This PR does enable us to create aggregate functions with more than one 
argument, and therefore this allows that option if we wish so. My initial 
thinking was supporting aggregate functions of more arguments just to support 
things like `covariance` and `correlation`, but now that you mention, we can do 
a lot of other things also. Another one is count distinct over N columns.

##########
File path: rust/datafusion/benches/aggregate_query_sql.rs
##########
@@ -39,72 +46,105 @@ fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
     for _batch in results {}
 }
 
-fn create_context() -> ExecutionContext {
-    // define schema for data source (csv file)
+fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
+    // use random numbers to avoid spurious compiler optimizations wrt to 
branching
+    let mut rng = rand::thread_rng();
+
+    (0..size)
+        .map(|_| {
+            if rng.gen::<f64>() > null_density {
+                None
+            } else {
+                Some(rng.gen::<f64>())
+            }
+        })
+        .collect()
+}
+
+fn create_context(
+    partitions_len: usize,
+    array_len: usize,
+    batch_size: usize,
+) -> Result<ExecutionContext> {
+    // define a schema.
     let schema = Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Utf8, false),
-        Field::new("c2", DataType::UInt32, false),
-        Field::new("c3", DataType::Int8, false),
-        Field::new("c4", DataType::Int16, false),
-        Field::new("c5", DataType::Int32, false),
-        Field::new("c6", DataType::Int64, false),
-        Field::new("c7", DataType::UInt8, false),
-        Field::new("c8", DataType::UInt16, false),
-        Field::new("c9", DataType::UInt32, false),
-        Field::new("c10", DataType::UInt64, false),
-        Field::new("c11", DataType::Float32, false),
-        Field::new("c12", DataType::Float64, false),
-        Field::new("c13", DataType::Utf8, false),
+        Field::new("utf8", DataType::Utf8, false),
+        Field::new("f32", DataType::Float32, false),
+        Field::new("f64", DataType::Float64, false),
     ]));
 
-    let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not 
defined");
+    // define data.
+    let partitions = (0..partitions_len)
+        .map(|_| {
+            (0..array_len / batch_size / partitions_len)
+                .map(|i| {
+                    let keys: Vec<String> = (0..batch_size)
+                        .map(
+                            // the 4 here is the number of different keys.
+                            // a higher number increase sparseness
+                            |i| format!("hi{}", i % 4),

Review comment:
       This should be random, i%4 is quite predictable. 

##########
File path: rust/datafusion/benches/aggregate_query_sql.rs
##########
@@ -39,72 +46,105 @@ fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
     for _batch in results {}
 }
 
-fn create_context() -> ExecutionContext {
-    // define schema for data source (csv file)
+fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
+    // use random numbers to avoid spurious compiler optimizations wrt to 
branching
+    let mut rng = rand::thread_rng();
+
+    (0..size)
+        .map(|_| {
+            if rng.gen::<f64>() > null_density {
+                None
+            } else {
+                Some(rng.gen::<f64>())
+            }
+        })
+        .collect()
+}
+
+fn create_context(
+    partitions_len: usize,
+    array_len: usize,
+    batch_size: usize,
+) -> Result<ExecutionContext> {
+    // define a schema.
     let schema = Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Utf8, false),
-        Field::new("c2", DataType::UInt32, false),
-        Field::new("c3", DataType::Int8, false),
-        Field::new("c4", DataType::Int16, false),
-        Field::new("c5", DataType::Int32, false),
-        Field::new("c6", DataType::Int64, false),
-        Field::new("c7", DataType::UInt8, false),
-        Field::new("c8", DataType::UInt16, false),
-        Field::new("c9", DataType::UInt32, false),
-        Field::new("c10", DataType::UInt64, false),
-        Field::new("c11", DataType::Float32, false),
-        Field::new("c12", DataType::Float64, false),
-        Field::new("c13", DataType::Utf8, false),
+        Field::new("utf8", DataType::Utf8, false),
+        Field::new("f32", DataType::Float32, false),
+        Field::new("f64", DataType::Float64, false),
     ]));
 
-    let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not 
defined");
+    // define data.
+    let partitions = (0..partitions_len)
+        .map(|_| {
+            (0..array_len / batch_size / partitions_len)
+                .map(|i| {
+                    let keys: Vec<String> = (0..batch_size)
+                        .map(
+                            // the 4 here is the number of different keys.
+                            // a higher number increase sparseness
+                            |i| format!("hi{}", i % 4),
+                        )
+                        .collect();
+                    let keys: Vec<&str> = keys.iter().map(|e| &**e).collect();
+
+                    let values = create_data(batch_size, 0.5);
+
+                    RecordBatch::try_new(
+                        schema.clone(),
+                        vec![
+                            Arc::new(StringArray::from(keys)),
+                            Arc::new(Float32Array::from(vec![i as f32; 
batch_size])),
+                            Arc::new(Float64Array::from(values)),
+                        ],
+                    )
+                    .unwrap()
+                })
+                .collect::<Vec<_>>()
+        })
+        .collect::<Vec<_>>();
 
-    // create CSV data source
-    let csv = CsvFile::try_new(
-        &format!("{}/csv/aggregate_test_100.csv", testdata),
-        CsvReadOptions::new().schema(&schema),
-    )
-    .unwrap();
+    let mut ctx = ExecutionContext::new();
 
-    let mem_table = MemTable::load(&csv).unwrap();
+    // declare a table in memory. In spark API, this corresponds to 
createDataFrame(...).
+    let provider = MemTable::new(schema, partitions)?;
+    ctx.register_table("t", Box::new(provider));
 
-    // create local execution context
-    let mut ctx = ExecutionContext::new();
-    ctx.register_table("aggregate_test_100", Box::new(mem_table));
-    ctx
+    Ok(ctx)
 }
 
 fn criterion_benchmark(c: &mut Criterion) {
-    c.bench_function("aggregate_query_no_group_by", |b| {
-        let mut ctx = create_context();
+    let partitions_len = 4;
+    let array_len = 32768; // 2^15
+    let batch_size = 2048; // 2^11
+    let mut ctx = create_context(partitions_len, array_len, 
batch_size).unwrap();
+
+    c.bench_function("aggregate_query_no_group_by 15 12", |b| {

Review comment:
       11, not 12.

##########
File path: rust/datafusion/src/physical_plan/common.rs
##########
@@ -205,3 +213,91 @@ pub fn get_scalar_value(array: &ArrayRef, row: usize) -> 
Result<Option<ScalarVal
     };
     Ok(value)
 }
+
+/// Converts a scalar value into an array.
+/// This is useful for aggregations.
+pub fn to_array(value: &ScalarValue) -> Result<ArrayRef> {
+    match value {
+        ScalarValue::Boolean(e) => Ok(Arc::new(BooleanArray::from(vec![*e])) 
as ArrayRef),
+        ScalarValue::Float64(e) => Ok(Arc::new(Float64Array::from(vec![*e])) 
as ArrayRef),
+        ScalarValue::Float32(e) => Ok(Arc::new(Float32Array::from(vec![*e]))),
+        ScalarValue::Int8(e) => Ok(Arc::new(Int8Array::from(vec![*e]))),
+        ScalarValue::Int16(e) => Ok(Arc::new(Int16Array::from(vec![*e]))),
+        ScalarValue::Int32(e) => Ok(Arc::new(Int32Array::from(vec![*e]))),
+        ScalarValue::Int64(e) => Ok(Arc::new(Int64Array::from(vec![*e]))),
+        ScalarValue::UInt8(e) => Ok(Arc::new(UInt8Array::from(vec![*e]))),
+        ScalarValue::UInt16(e) => Ok(Arc::new(UInt16Array::from(vec![*e]))),
+        ScalarValue::UInt32(e) => Ok(Arc::new(UInt32Array::from(vec![*e]))),
+        ScalarValue::UInt64(e) => Ok(Arc::new(UInt64Array::from(vec![*e]))),
+        ScalarValue::Utf8(e) => {
+            // awful code...
+            let v = e.as_ref().unwrap_or(&"".to_string()).clone();
+            let v = e.as_ref().and_then(|_| Some(&*v));
+            Ok(Arc::new(StringArray::from(vec![v])))
+        }
+        ScalarValue::LargeUtf8(e) => {
+            // awful code...
+            let v = e.as_ref().unwrap_or(&"".to_string()).clone();
+            let v = e.as_ref().and_then(|_| Some(&*v));
+            Ok(Arc::new(LargeStringArray::from(vec![v])))
+        }
+        ScalarValue::Null => Err(ExecutionError::InternalError(format!(
+            "Cannot convert scalar {:?} to array",
+            value
+        ))),
+        ScalarValue::Struct(_) => Err(ExecutionError::InternalError(format!(
+            "Cannot convert scalar {:?} to array",
+            value
+        ))),
+    }
+}
+
+/// creates an empty record batch.
+pub fn create_batch_empty(schema: &Schema) -> Result<Vec<ArrayRef>> {
+    schema
+        .fields()
+        .iter()
+        .map(|f| match f.data_type() {
+            DataType::Float32 => {
+                Ok(Arc::new(Float32Array::from(vec![] as Vec<f32>)) as 
ArrayRef)
+            }
+            DataType::Float64 => {
+                Ok(Arc::new(Float64Array::from(vec![] as Vec<f64>)) as 
ArrayRef)
+            }
+            DataType::Int64 => {
+                Ok(Arc::new(Int64Array::from(vec![] as Vec<i64>)) as ArrayRef)
+            }
+            DataType::Int32 => {
+                Ok(Arc::new(Int32Array::from(vec![] as Vec<i32>)) as ArrayRef)
+            }
+            DataType::Int16 => {
+                Ok(Arc::new(Int16Array::from(vec![] as Vec<i16>)) as ArrayRef)
+            }
+            DataType::Int8 => {
+                Ok(Arc::new(Int8Array::from(vec![] as Vec<i8>)) as ArrayRef)
+            }
+            DataType::UInt64 => {
+                Ok(Arc::new(UInt64Array::from(vec![] as Vec<u64>)) as ArrayRef)
+            }
+            DataType::UInt32 => {
+                Ok(Arc::new(UInt32Array::from(vec![] as Vec<u32>)) as ArrayRef)
+            }
+            DataType::UInt16 => {
+                Ok(Arc::new(UInt16Array::from(vec![] as Vec<u16>)) as ArrayRef)
+            }
+            DataType::UInt8 => {
+                Ok(Arc::new(UInt8Array::from(vec![] as Vec<u8>)) as ArrayRef)
+            }
+            DataType::Utf8 => {
+                Ok(Arc::new(StringArray::from(vec![] as Vec<&str>)) as 
ArrayRef)
+            }
+            DataType::Boolean => {
+                Ok(Arc::new(BooleanArray::from(vec![] as Vec<bool>)) as 
ArrayRef)
+            }
+            _ => Err(ExecutionError::NotImplemented(format!(

Review comment:
       So far this was enough because we do not have aggregations with other 
types, but this is needed because some batches can have no entries, in which 
case we need to build an empty record batch.

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -97,766 +104,712 @@ pub fn col(name: &str) -> Arc<dyn PhysicalExpr> {
 /// SUM aggregate expression
 #[derive(Debug)]
 pub struct Sum {
+    name: String,

Review comment:
       `AggregateExpr` has this information with them now because it allows 
them to `create_accumulator` without access to the input schema. This is 
helpful because, on the second pass, we need to create accumulators on the fly, 
and the input_schema of the second pass is different, as it now corresponds to 
the schema 
   
   `[group1, group2, agg1_state1, agg1_state2, ...]`

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -97,766 +104,712 @@ pub fn col(name: &str) -> Arc<dyn PhysicalExpr> {
 /// SUM aggregate expression
 #[derive(Debug)]
 pub struct Sum {
+    name: String,
+    data_type: DataType,
     expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
 }
 
-impl Sum {
-    /// Create a new SUM aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
-        Self { expr }
+/// function return type of a sum
+pub fn sum_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 
=> {
+            Ok(DataType::Int64)
+        }
+        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
+            Ok(DataType::UInt64)
+        }
+        DataType::Float32 => Ok(DataType::Float32),
+        DataType::Float64 => Ok(DataType::Float64),
+        other => Err(ExecutionError::General(format!(
+            "SUM does not support type \"{:?}\"",
+            other
+        ))),
     }
 }
 
-impl AggregateExpr for Sum {
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {
-            DataType::Int8 | DataType::Int16 | DataType::Int32 | 
DataType::Int64 => {
-                Ok(DataType::Int64)
-            }
-            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
-                Ok(DataType::UInt64)
-            }
-            DataType::Float32 => Ok(DataType::Float32),
-            DataType::Float64 => Ok(DataType::Float64),
-            other => Err(ExecutionError::General(format!(
-                "SUM does not support {:?}",
-                other
-            ))),
+impl Sum {
+    /// Create a new SUM aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) 
-> Self {
+        Self {
+            name,
+            expr,
+            data_type,
+            nullable: true,
         }
     }
+}
 
-    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        // null should be returned if no rows are aggregated
-        Ok(true)
+impl AggregateExpr for Sum {
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
     }
 
-    fn evaluate_input(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        self.expr.evaluate(batch)
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "sum"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
     }
 
-    fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>> {
-        Rc::new(RefCell::new(SumAccumulator { sum: None }))
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
     }
 
-    fn create_reducer(&self, column_name: &str) -> Arc<dyn AggregateExpr> {
-        Arc::new(Sum::new(Arc::new(Column::new(column_name))))
+    fn create_accumulator(&self) -> Result<Rc<RefCell<dyn Accumulator>>> {
+        Ok(Rc::new(RefCell::new(SumAccumulator::try_new(
+            &self.data_type,
+        )?)))
     }
 }
 
-macro_rules! sum_accumulate {

Review comment:
       This was operating on a row-by-row basis, which was replaced by a batch 
operation using `compute::sum`.

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -97,766 +104,712 @@ pub fn col(name: &str) -> Arc<dyn PhysicalExpr> {
 /// SUM aggregate expression
 #[derive(Debug)]
 pub struct Sum {
+    name: String,
+    data_type: DataType,
     expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
 }
 
-impl Sum {
-    /// Create a new SUM aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
-        Self { expr }
+/// function return type of a sum
+pub fn sum_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 
=> {
+            Ok(DataType::Int64)
+        }
+        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
+            Ok(DataType::UInt64)
+        }
+        DataType::Float32 => Ok(DataType::Float32),
+        DataType::Float64 => Ok(DataType::Float64),
+        other => Err(ExecutionError::General(format!(
+            "SUM does not support type \"{:?}\"",
+            other
+        ))),
     }
 }
 
-impl AggregateExpr for Sum {
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {
-            DataType::Int8 | DataType::Int16 | DataType::Int32 | 
DataType::Int64 => {
-                Ok(DataType::Int64)
-            }
-            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
-                Ok(DataType::UInt64)
-            }
-            DataType::Float32 => Ok(DataType::Float32),
-            DataType::Float64 => Ok(DataType::Float64),
-            other => Err(ExecutionError::General(format!(
-                "SUM does not support {:?}",
-                other
-            ))),
+impl Sum {
+    /// Create a new SUM aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) 
-> Self {
+        Self {
+            name,
+            expr,
+            data_type,
+            nullable: true,
         }
     }
+}
 
-    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        // null should be returned if no rows are aggregated
-        Ok(true)
+impl AggregateExpr for Sum {
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
     }
 
-    fn evaluate_input(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        self.expr.evaluate(batch)
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "sum"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
     }
 
-    fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>> {
-        Rc::new(RefCell::new(SumAccumulator { sum: None }))
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
     }
 
-    fn create_reducer(&self, column_name: &str) -> Arc<dyn AggregateExpr> {
-        Arc::new(Sum::new(Arc::new(Column::new(column_name))))
+    fn create_accumulator(&self) -> Result<Rc<RefCell<dyn Accumulator>>> {
+        Ok(Rc::new(RefCell::new(SumAccumulator::try_new(
+            &self.data_type,
+        )?)))
     }
 }
 
-macro_rules! sum_accumulate {
-    ($SELF:ident, $VALUE:expr, $ARRAY_TYPE:ident, $SCALAR_VARIANT:ident, 
$TY:ty) => {{
-        $SELF.sum = match $SELF.sum {
-            Some(ScalarValue::$SCALAR_VARIANT(n)) => {
-                Some(ScalarValue::$SCALAR_VARIANT(n + $VALUE as $TY))
-            }
-            Some(_) => {
-                return Err(ExecutionError::InternalError(
-                    "Unexpected ScalarValue variant".to_string(),
-                ))
-            }
-            None => Some(ScalarValue::$SCALAR_VARIANT($VALUE as $TY)),
-        };
-    }};
-}
-
 #[derive(Debug)]
 struct SumAccumulator {
-    sum: Option<ScalarValue>,
+    sum: ScalarValue,
 }
 
-impl Accumulator for SumAccumulator {
-    fn accumulate_scalar(&mut self, value: Option<ScalarValue>) -> Result<()> {
-        if let Some(value) = value {
-            match value {
-                ScalarValue::Int8(value) => {
-                    sum_accumulate!(self, value, Int8Array, Int64, i64);
-                }
-                ScalarValue::Int16(value) => {
-                    sum_accumulate!(self, value, Int16Array, Int64, i64);
-                }
-                ScalarValue::Int32(value) => {
-                    sum_accumulate!(self, value, Int32Array, Int64, i64);
-                }
-                ScalarValue::Int64(value) => {
-                    sum_accumulate!(self, value, Int64Array, Int64, i64);
-                }
-                ScalarValue::UInt8(value) => {
-                    sum_accumulate!(self, value, UInt8Array, UInt64, u64);
-                }
-                ScalarValue::UInt16(value) => {
-                    sum_accumulate!(self, value, UInt16Array, UInt64, u64);
-                }
-                ScalarValue::UInt32(value) => {
-                    sum_accumulate!(self, value, UInt32Array, UInt64, u64);
-                }
-                ScalarValue::UInt64(value) => {
-                    sum_accumulate!(self, value, UInt64Array, UInt64, u64);
-                }
-                ScalarValue::Float32(value) => {
-                    sum_accumulate!(self, value, Float32Array, Float32, f32);
-                }
-                ScalarValue::Float64(value) => {
-                    sum_accumulate!(self, value, Float64Array, Float64, f64);
-                }
-                other => {
-                    return Err(ExecutionError::General(format!(
-                        "SUM does not support {:?}",
-                        other
-                    )))
-                }
-            }
-        }
-        Ok(())
+impl SumAccumulator {
+    /// new sum accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            sum: ScalarValue::try_from(data_type)?,
+        })
     }
+}
 
-    fn accumulate_batch(&mut self, array: &ArrayRef) -> Result<()> {
-        let sum = match array.data_type() {
-            DataType::UInt8 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt8Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::UInt8(n))),
-                    None => Ok(None),
-                }
+// returns the new value after sum with the new values, taking nullability 
into account
+macro_rules! typed_sum_accumulate {
+    ($OLD_VALUE:expr, $NEW_VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, 
$TYPE:ident) => {{
+        let array = $NEW_VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+        let delta = compute::sum(array);
+        if $OLD_VALUE.is_none() {
+            ScalarValue::$SCALAR(delta.and_then(|e| Some(e as $TYPE)))
+        } else {
+            let delta = delta.and_then(|e| Some(e as $TYPE)).unwrap_or(0 as 
$TYPE);
+            ScalarValue::from($OLD_VALUE.unwrap() + delta)
+        }
+    }};
+}
+
+// given an existing value `old` and an `array` of new values,
+// performs a sum, returning the new value.
+fn sum_accumulate(old: &ScalarValue, array: &ArrayRef) -> Result<ScalarValue> {
+    Ok(match old {
+        ScalarValue::Float64(sum) => match array.data_type() {
+            DataType::Float64 => {
+                typed_sum_accumulate!(sum, array, Float64Array, Float64, f64)
             }
-            DataType::UInt16 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt16Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::UInt16(n))),
-                    None => Ok(None),
-                }
+            DataType::Float32 => {
+                typed_sum_accumulate!(sum, array, Float32Array, Float64, f64)
             }
-            DataType::UInt32 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt32Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::UInt32(n))),
-                    None => Ok(None),
-                }
+            DataType::Int64 => {
+                typed_sum_accumulate!(sum, array, Int64Array, Float64, f64)
+            }
+            DataType::Int32 => {
+                typed_sum_accumulate!(sum, array, Int32Array, Float64, f64)
+            }
+            DataType::Int16 => {
+                typed_sum_accumulate!(sum, array, Int16Array, Float64, f64)
             }
+            DataType::Int8 => typed_sum_accumulate!(sum, array, Int8Array, 
Float64, f64),
             DataType::UInt64 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt64Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::UInt64(n))),
-                    None => Ok(None),
-                }
+                typed_sum_accumulate!(sum, array, UInt64Array, Float64, f64)
             }
-            DataType::Int8 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int8Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int8(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt32 => {
+                typed_sum_accumulate!(sum, array, UInt32Array, Float64, f64)
             }
-            DataType::Int16 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int16Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int16(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt16 => {
+                typed_sum_accumulate!(sum, array, UInt16Array, Float64, f64)
             }
-            DataType::Int32 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int32Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int32(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt8 => {
+                typed_sum_accumulate!(sum, array, UInt8Array, Float64, f64)
             }
-            DataType::Int64 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int64Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int64(n))),
-                    None => Ok(None),
-                }
+            dt => {
+                return Err(ExecutionError::InternalError(format!(
+                    "Sum f64 does not expect to receive type {:?}",
+                    dt
+                )))
             }
-            DataType::Float32 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Float32Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::Float32(n))),
-                    None => Ok(None),
-                }
+        },
+        ScalarValue::Float32(sum) => {
+            typed_sum_accumulate!(sum, array, Float32Array, Float32, f32)
+        }
+        ScalarValue::UInt64(sum) => match array.data_type() {
+            DataType::UInt64 => {
+                typed_sum_accumulate!(sum, array, UInt64Array, UInt64, u64)
             }
-            DataType::Float64 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Float64Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::Float64(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt32 => {
+                typed_sum_accumulate!(sum, array, UInt32Array, UInt64, u64)
             }
-            _ => Err(ExecutionError::ExecutionError(
-                "Unsupported data type for SUM".to_string(),
-            )),
-        }?;
-        self.accumulate_scalar(sum)
+            DataType::UInt16 => {
+                typed_sum_accumulate!(sum, array, UInt16Array, UInt64, u64)
+            }
+            DataType::UInt8 => typed_sum_accumulate!(sum, array, UInt8Array, 
UInt64, u64),
+            dt => {
+                return Err(ExecutionError::InternalError(format!(
+                    "Sum is not expected to receive type {:?}",
+                    dt
+                )))
+            }
+        },
+        ScalarValue::Int64(sum) => match array.data_type() {
+            DataType::Int64 => typed_sum_accumulate!(sum, array, Int64Array, 
Int64, i64),
+            DataType::Int32 => typed_sum_accumulate!(sum, array, Int32Array, 
Int64, i64),
+            DataType::Int16 => typed_sum_accumulate!(sum, array, Int16Array, 
Int64, i64),
+            DataType::Int8 => typed_sum_accumulate!(sum, array, Int8Array, 
Int64, i64),
+            dt => {
+                return Err(ExecutionError::InternalError(format!(
+                    "Sum is not expected to receive type {:?}",
+                    dt
+                )))
+            }
+        },
+        e => {
+            return Err(ExecutionError::InternalError(format!(
+                "Sum is not expected to receive a scalar {:?}",
+                e
+            )))
+        }
+    })
+}
+
+impl Accumulator for SumAccumulator {
+    fn update(&mut self, values: &Vec<ArrayRef>) -> Result<()> {
+        // sum(v1, v2, v3) = v1 + v2 + v3
+        self.sum = sum_accumulate(&self.sum, &values[0])?;
+        Ok(())
     }
 
-    fn get_value(&self) -> Result<Option<ScalarValue>> {
-        Ok(self.sum.clone())
+    fn merge(&mut self, states: &Vec<ArrayRef>) -> Result<()> {
+        let state = &states[0];
+        // sum(sum1, sum2, sum3) = sum1 + sum2 + sum3
+        self.sum = sum_accumulate(&self.sum, state)?;
+        Ok(())
     }
-}
 
-/// Create a sum expression
-pub fn sum(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn AggregateExpr> {
-    Arc::new(Sum::new(expr))
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.sum.clone()])
+    }
+
+    fn value(&self) -> Result<ScalarValue> {
+        Ok(self.sum.clone())
+    }
 }
 
 /// AVG aggregate expression
 #[derive(Debug)]
 pub struct Avg {
+    name: String,
+    data_type: DataType,
+    nullable: bool,
     expr: Arc<dyn PhysicalExpr>,
 }
 
-impl Avg {
-    /// Create a new AVG aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
-        Self { expr }
+/// function return type of an average
+pub fn avg_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(ExecutionError::General(format!(
+            "AVG does not support {:?}",
+            other
+        ))),
     }
 }
 
-impl AggregateExpr for Avg {
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {
-            DataType::Int8
-            | DataType::Int16
-            | DataType::Int32
-            | DataType::Int64
-            | DataType::UInt8
-            | DataType::UInt16
-            | DataType::UInt32
-            | DataType::UInt64
-            | DataType::Float32
-            | DataType::Float64 => Ok(DataType::Float64),
-            other => Err(ExecutionError::General(format!(
-                "AVG does not support {:?}",
-                other
-            ))),
+impl Avg {
+    /// Create a new AVG aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) 
-> Self {
+        Self {
+            name,
+            expr,
+            data_type,
+            nullable: true,
         }
     }
+}
 
-    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        // null should be returned if no rows are aggregated
-        Ok(true)
+impl AggregateExpr for Avg {
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
     }
 
-    fn evaluate_input(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        self.expr.evaluate(batch)
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "sum"),
+                DataType::Float64,
+                true,
+            ),
+        ])
     }
 
-    fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>> {
-        Rc::new(RefCell::new(AvgAccumulator {
-            sum: None,
-            count: None,
-        }))
+    fn create_accumulator(&self) -> Result<Rc<RefCell<dyn Accumulator>>> {
+        Ok(Rc::new(RefCell::new(AvgAccumulator::try_new(
+            // avg is f64
+            &DataType::Float64,
+        )?)))
     }
 
-    fn create_reducer(&self, column_name: &str) -> Arc<dyn AggregateExpr> {
-        Arc::new(Avg::new(Arc::new(Column::new(column_name))))
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
     }
 }
 
-macro_rules! avg_accumulate {
-    ($SELF:ident, $VALUE:expr, $ARRAY_TYPE:ident) => {{
-        match ($SELF.sum, $SELF.count) {
-            (Some(sum), Some(count)) => {
-                $SELF.sum = Some(sum + $VALUE as f64);
-                $SELF.count = Some(count + 1);
-            }
-            _ => {
-                $SELF.sum = Some($VALUE as f64);
-                $SELF.count = Some(1);
-            }
-        };
-    }};
-}
 #[derive(Debug)]
 struct AvgAccumulator {
-    sum: Option<f64>,
-    count: Option<i64>,
+    // sum is used for null
+    sum: ScalarValue,
+    count: u64,
+}
+
+impl AvgAccumulator {
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
+        Ok(Self {
+            sum: ScalarValue::try_from(datatype)?,
+            count: 0,
+        })
+    }
 }
 
 impl Accumulator for AvgAccumulator {
-    fn accumulate_scalar(&mut self, value: Option<ScalarValue>) -> Result<()> {
-        if let Some(value) = value {
-            match value {
-                ScalarValue::Int8(value) => avg_accumulate!(self, value, 
Int8Array),
-                ScalarValue::Int16(value) => avg_accumulate!(self, value, 
Int16Array),
-                ScalarValue::Int32(value) => avg_accumulate!(self, value, 
Int32Array),
-                ScalarValue::Int64(value) => avg_accumulate!(self, value, 
Int64Array),
-                ScalarValue::UInt8(value) => avg_accumulate!(self, value, 
UInt8Array),
-                ScalarValue::UInt16(value) => avg_accumulate!(self, value, 
UInt16Array),
-                ScalarValue::UInt32(value) => avg_accumulate!(self, value, 
UInt32Array),
-                ScalarValue::UInt64(value) => avg_accumulate!(self, value, 
UInt64Array),
-                ScalarValue::Float32(value) => avg_accumulate!(self, value, 
Float32Array),
-                ScalarValue::Float64(value) => avg_accumulate!(self, value, 
Float64Array),
-                other => {
-                    return Err(ExecutionError::General(format!(
-                        "AVG does not support {:?}",
-                        other
-                    )))
-                }
-            }
-        }
+    fn update(&mut self, values: &Vec<ArrayRef>) -> Result<()> {
+        let values = &values[0];
+
+        self.count += (values.len() - values.data().null_count()) as u64;
+        self.sum = sum_accumulate(&self.sum, values)?;
         Ok(())
     }
 
-    fn accumulate_batch(&mut self, array: &ArrayRef) -> Result<()> {
-        for row in 0..array.len() {
-            self.accumulate_scalar(get_scalar_value(array, row)?)?;
-        }
+    fn merge(&mut self, states: &Vec<ArrayRef>) -> Result<()> {

Review comment:
       This is the prime example of this PR: the merge here uses two states to 
change two states from the accumulator.

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -1835,88 +1797,18 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn sum_contract() -> Result<()> {

Review comment:
       I removed these because the types are now selected from the signature, 
and no longer inferred by `AggregateExpr`.

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -848,6 +848,24 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn aggregate_grouped_empty() -> Result<()> {

Review comment:
       A test for grouped with an empty result

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -1147,6 +1165,41 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn simple_avg() -> Result<()> {
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+
+        let batch1 = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
+        )?;
+        let batch2 = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(Int32Array::from(vec![4, 5]))],
+        )?;
+
+        let mut ctx = ExecutionContext::new();
+
+        let provider = MemTable::new(Arc::new(schema), vec![vec![batch1], 
vec![batch2]])?;
+        ctx.register_table("t", Box::new(provider));
+
+        let result = collect(&mut ctx, "SELECT AVG(a) FROM t")?;
+
+        let batch = &result[0];
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(1, batch.num_rows());
+
+        let values = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .expect("failed to cast version");
+        assert_eq!(values.len(), 1);
+        // avg(1,2,3,4,5) = 3.0
+        assert_eq!(values.value(0), 3.0_f64);

Review comment:
       This test fails in master, with 3.25 != 3.0

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -276,117 +272,91 @@ impl RecordBatchReader for GroupedHashAggregateIterator {
                 })
                 .collect::<ArrowResult<Vec<_>>>()?;
 
-            // evaluate the inputs to the aggregate expressions for this batch
-            let aggr_input_values = self
-                .aggr_expr
-                .iter()
-                .map(|expr| {
-                    expr.evaluate_input(&batch)
-                        .map_err(ExecutionError::into_arrow_external_error)
-                })
-                .collect::<ArrowResult<Vec<_>>>()?;
+            // evaluate the aggregation expressions. We could evaluate them 
after the `take`, but since
+            // we need to evaluate all of them anyways, it is more performant 
to do it while they are together.
+            let aggr_input_values = evaluate(&expressions, &batch)
+                .map_err(ExecutionError::into_arrow_external_error)?;
 
             // create vector large enough to hold the grouping key
+            // this is an optimization to avoid allocating `key` on every row.
+            // it will be overwritten on the loop below
             let mut key = Vec::with_capacity(group_values.len());
             for _ in 0..group_values.len() {
                 key.push(GroupByScalar::UInt32(0));
             }
 
-            // iterate over each row in the batch and create the accumulators 
for each grouping key
-            let mut accums: Vec<Rc<AccumulatorSet>> =
-                Vec::with_capacity(batch.num_rows());
-
+            // 1.1 construct the key from the group values
+            // 1.2 construct/update the mapping key -> indexes (on the batch) 
used to `take` values from the batch in a single operation

Review comment:
       wrong comment, 1.2 and 1.3 are together.

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -709,64 +659,111 @@ fn create_key(
 #[cfg(test)]
 mod tests {
 
+    use arrow::array::Float64Array;
+
     use super::*;
-    use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
-    use crate::physical_plan::expressions::{col, sum};
+    use crate::physical_plan::expressions::{col, Avg};
     use crate::physical_plan::merge::MergeExec;
-    use crate::test;
+    use crate::physical_plan::{common, memory::MemoryExec};
+
+    fn some_data() -> ArrowResult<(Arc<Schema>, Vec<RecordBatch>)> {

Review comment:
       I placed controlled data here so that we cab easily check the result

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -218,32 +217,18 @@ impl DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
                 let aggregates = aggr_expr
                     .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_aggregate_expr(e, &input_schema, 
ctx_state),
-                            e.name(&input_schema),
-                        ))
-                    })
+                    .map(|e| self.create_aggregate_expr(e, &input_schema, 
ctx_state))
                     .collect::<Result<Vec<_>>>()?;
 
-                let initial_aggr = HashAggregateExec::try_new(
+                let initial_aggr = Arc::new(HashAggregateExec::try_new(
                     AggregateMode::Partial,
                     groups.clone(),
                     aggregates.clone(),
                     input,
-                )?;
+                )?);
 
-                if initial_aggr.output_partitioning().partition_count() == 1 {

Review comment:
       This seems major for a single partition.

##########
File path: rust/datafusion/src/scalar.rs
##########
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides ScalarValue, an enum that can be used for storage of 
single elements
+
+use std::{convert::TryFrom, fmt};
+
+use arrow::datatypes::DataType;
+
+use crate::error::{ExecutionError, Result};
+
+/// ScalarValue enumeration
+#[derive(Clone, PartialEq)]
+pub enum ScalarValue {
+    /// null value
+    Null,

Review comment:
       no longer needed?

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -218,32 +217,18 @@ impl DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
                 let aggregates = aggr_expr
                     .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_aggregate_expr(e, &input_schema, 
ctx_state),
-                            e.name(&input_schema),
-                        ))
-                    })
+                    .map(|e| self.create_aggregate_expr(e, &input_schema, 
ctx_state))
                     .collect::<Result<Vec<_>>>()?;
 
-                let initial_aggr = HashAggregateExec::try_new(
+                let initial_aggr = Arc::new(HashAggregateExec::try_new(
                     AggregateMode::Partial,
                     groups.clone(),
                     aggregates.clone(),
                     input,
-                )?;
+                )?);
 
-                if initial_aggr.output_partitioning().partition_count() == 1 {

Review comment:
       Sorry. What I meant is that compared to the previous implementation, we 
double the amount of work, even when we did not need because there is a single 
partition.
   
   IMO we should handle this in another way, e.g. via a physical optimizer that 
re-writes the partial aggregation by a final aggregation when the input is a 
single partition.

##########
File path: rust/arrow/src/compute/kernels/aggregate.rs
##########
@@ -19,9 +19,42 @@
 
 use std::ops::Add;
 
-use crate::array::{Array, PrimitiveArray};
+use crate::array::{Array, LargeStringArray, PrimitiveArray, StringArray};
 use crate::datatypes::ArrowNumericType;
 
+/// Helper macro to perform min/max of strings
+macro_rules! min_max_string_helper {
+    ($array:expr, $cmp:tt) => {{
+        let null_count = $array.null_count();
+
+        if null_count == $array.len() {
+            return None
+        }
+        let mut n = "";
+        let mut has_value = false;
+        let data = $array.data();
+
+        if null_count == 0 {

Review comment:
       yes. Generally, operations on non-null fields are faster because there 
isn't an unpredictable branch on the loop. This check removes that 
unpredictable branch altogether when there are no nulls.
   
   We could probably still get some more juice by using some of the vertical 
operations supported by packed_simd.

##########
File path: rust/datafusion/src/physical_plan/aggregates.rs
##########
@@ -103,42 +103,54 @@ pub fn create_aggregate_expr(
     fun: &AggregateFunction,
     args: &Vec<Arc<dyn PhysicalExpr>>,
     input_schema: &Schema,
+    name: String,
 ) -> Result<Arc<dyn AggregateExpr>> {
     // coerce
     let arg = coerce(args, input_schema, &signature(fun))?[0].clone();
 
+    let arg_types = args
+        .iter()
+        .map(|e| e.data_type(input_schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    let return_type = return_type(&fun, &arg_types)?;
+
     Ok(match fun {
-        AggregateFunction::Count => expressions::count(arg),
-        AggregateFunction::Sum => expressions::sum(arg),
-        AggregateFunction::Min => expressions::min(arg),
-        AggregateFunction::Max => expressions::max(arg),
-        AggregateFunction::Avg => expressions::avg(arg),
+        AggregateFunction::Count => {
+            Arc::new(expressions::Count::new(arg, name, return_type))
+        }
+        AggregateFunction::Sum => Arc::new(expressions::Sum::new(arg, name, 
return_type)),
+        AggregateFunction::Min => Arc::new(expressions::Min::new(arg, name, 
return_type)),
+        AggregateFunction::Max => Arc::new(expressions::Max::new(arg, name, 
return_type)),
+        AggregateFunction::Avg => Arc::new(expressions::Avg::new(arg, name, 
return_type)),
     })
 }
 
+static NUMERICS: &'static [DataType] = &[
+    DataType::Int8,
+    DataType::Int16,
+    DataType::Int32,
+    DataType::Int64,
+    DataType::UInt8,
+    DataType::UInt16,
+    DataType::UInt32,
+    DataType::UInt64,
+    DataType::Float32,
+    DataType::Float64,
+];
+
 /// the signatures supported by the function `fun`.
 fn signature(fun: &AggregateFunction) -> Signature {
     // note: the physical expression must accept the type returned by this 
function or the execution panics.
-
     match fun {
         AggregateFunction::Count => Signature::Any(1),
-        AggregateFunction::Min
-        | AggregateFunction::Max
-        | AggregateFunction::Avg
-        | AggregateFunction::Sum => Signature::Uniform(
-            1,
-            vec![
-                DataType::Int8,
-                DataType::Int16,
-                DataType::Int32,
-                DataType::Int64,
-                DataType::UInt8,
-                DataType::UInt16,
-                DataType::UInt32,
-                DataType::UInt64,
-                DataType::Float32,
-                DataType::Float64,
-            ],
-        ),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            let mut valid = vec![DataType::Utf8, DataType::LargeUtf8];
+            valid.extend_from_slice(NUMERICS);

Review comment:
       I have not though about that, but that is an interesting idea 👍 
   
   In this PR, `max` continues to only support a single column, which we select 
in [this 
line](https://github.com/apache/arrow/pull/8172/files#diff-a98d5d588d3c5b525c6840271a5bdddcR571).
   
   This PR does enable us to create aggregate functions with more than one 
argument, and therefore this allows that option if we wish so. My initial 
thinking was supporting aggregate functions of more arguments just to support 
things like `covariance` and `correlation`, but now that you mention, we can do 
a lot of other things also. Another one is count distinct over N columns.

##########
File path: rust/datafusion/benches/aggregate_query_sql.rs
##########
@@ -39,72 +46,105 @@ fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
     for _batch in results {}
 }
 
-fn create_context() -> ExecutionContext {
-    // define schema for data source (csv file)
+fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
+    // use random numbers to avoid spurious compiler optimizations wrt to 
branching
+    let mut rng = rand::thread_rng();
+
+    (0..size)
+        .map(|_| {
+            if rng.gen::<f64>() > null_density {
+                None
+            } else {
+                Some(rng.gen::<f64>())
+            }
+        })
+        .collect()
+}
+
+fn create_context(
+    partitions_len: usize,
+    array_len: usize,
+    batch_size: usize,
+) -> Result<ExecutionContext> {
+    // define a schema.
     let schema = Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Utf8, false),
-        Field::new("c2", DataType::UInt32, false),
-        Field::new("c3", DataType::Int8, false),
-        Field::new("c4", DataType::Int16, false),
-        Field::new("c5", DataType::Int32, false),
-        Field::new("c6", DataType::Int64, false),
-        Field::new("c7", DataType::UInt8, false),
-        Field::new("c8", DataType::UInt16, false),
-        Field::new("c9", DataType::UInt32, false),
-        Field::new("c10", DataType::UInt64, false),
-        Field::new("c11", DataType::Float32, false),
-        Field::new("c12", DataType::Float64, false),
-        Field::new("c13", DataType::Utf8, false),
+        Field::new("utf8", DataType::Utf8, false),
+        Field::new("f32", DataType::Float32, false),
+        Field::new("f64", DataType::Float64, false),
     ]));
 
-    let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not 
defined");
+    // define data.
+    let partitions = (0..partitions_len)
+        .map(|_| {
+            (0..array_len / batch_size / partitions_len)
+                .map(|i| {
+                    let keys: Vec<String> = (0..batch_size)
+                        .map(
+                            // the 4 here is the number of different keys.
+                            // a higher number increase sparseness
+                            |i| format!("hi{}", i % 4),

Review comment:
       This should be random, i%4 is quite predictable. 

##########
File path: rust/datafusion/benches/aggregate_query_sql.rs
##########
@@ -39,72 +46,105 @@ fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
     for _batch in results {}
 }
 
-fn create_context() -> ExecutionContext {
-    // define schema for data source (csv file)
+fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
+    // use random numbers to avoid spurious compiler optimizations wrt to 
branching
+    let mut rng = rand::thread_rng();
+
+    (0..size)
+        .map(|_| {
+            if rng.gen::<f64>() > null_density {
+                None
+            } else {
+                Some(rng.gen::<f64>())
+            }
+        })
+        .collect()
+}
+
+fn create_context(
+    partitions_len: usize,
+    array_len: usize,
+    batch_size: usize,
+) -> Result<ExecutionContext> {
+    // define a schema.
     let schema = Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Utf8, false),
-        Field::new("c2", DataType::UInt32, false),
-        Field::new("c3", DataType::Int8, false),
-        Field::new("c4", DataType::Int16, false),
-        Field::new("c5", DataType::Int32, false),
-        Field::new("c6", DataType::Int64, false),
-        Field::new("c7", DataType::UInt8, false),
-        Field::new("c8", DataType::UInt16, false),
-        Field::new("c9", DataType::UInt32, false),
-        Field::new("c10", DataType::UInt64, false),
-        Field::new("c11", DataType::Float32, false),
-        Field::new("c12", DataType::Float64, false),
-        Field::new("c13", DataType::Utf8, false),
+        Field::new("utf8", DataType::Utf8, false),
+        Field::new("f32", DataType::Float32, false),
+        Field::new("f64", DataType::Float64, false),
     ]));
 
-    let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not 
defined");
+    // define data.
+    let partitions = (0..partitions_len)
+        .map(|_| {
+            (0..array_len / batch_size / partitions_len)
+                .map(|i| {
+                    let keys: Vec<String> = (0..batch_size)
+                        .map(
+                            // the 4 here is the number of different keys.
+                            // a higher number increase sparseness
+                            |i| format!("hi{}", i % 4),
+                        )
+                        .collect();
+                    let keys: Vec<&str> = keys.iter().map(|e| &**e).collect();
+
+                    let values = create_data(batch_size, 0.5);
+
+                    RecordBatch::try_new(
+                        schema.clone(),
+                        vec![
+                            Arc::new(StringArray::from(keys)),
+                            Arc::new(Float32Array::from(vec![i as f32; 
batch_size])),
+                            Arc::new(Float64Array::from(values)),
+                        ],
+                    )
+                    .unwrap()
+                })
+                .collect::<Vec<_>>()
+        })
+        .collect::<Vec<_>>();
 
-    // create CSV data source
-    let csv = CsvFile::try_new(
-        &format!("{}/csv/aggregate_test_100.csv", testdata),
-        CsvReadOptions::new().schema(&schema),
-    )
-    .unwrap();
+    let mut ctx = ExecutionContext::new();
 
-    let mem_table = MemTable::load(&csv).unwrap();
+    // declare a table in memory. In spark API, this corresponds to 
createDataFrame(...).
+    let provider = MemTable::new(schema, partitions)?;
+    ctx.register_table("t", Box::new(provider));
 
-    // create local execution context
-    let mut ctx = ExecutionContext::new();
-    ctx.register_table("aggregate_test_100", Box::new(mem_table));
-    ctx
+    Ok(ctx)
 }
 
 fn criterion_benchmark(c: &mut Criterion) {
-    c.bench_function("aggregate_query_no_group_by", |b| {
-        let mut ctx = create_context();
+    let partitions_len = 4;
+    let array_len = 32768; // 2^15
+    let batch_size = 2048; // 2^11
+    let mut ctx = create_context(partitions_len, array_len, 
batch_size).unwrap();
+
+    c.bench_function("aggregate_query_no_group_by 15 12", |b| {

Review comment:
       11, not 12.

##########
File path: rust/datafusion/src/physical_plan/common.rs
##########
@@ -205,3 +213,91 @@ pub fn get_scalar_value(array: &ArrayRef, row: usize) -> 
Result<Option<ScalarVal
     };
     Ok(value)
 }
+
+/// Converts a scalar value into an array.
+/// This is useful for aggregations.
+pub fn to_array(value: &ScalarValue) -> Result<ArrayRef> {
+    match value {
+        ScalarValue::Boolean(e) => Ok(Arc::new(BooleanArray::from(vec![*e])) 
as ArrayRef),
+        ScalarValue::Float64(e) => Ok(Arc::new(Float64Array::from(vec![*e])) 
as ArrayRef),
+        ScalarValue::Float32(e) => Ok(Arc::new(Float32Array::from(vec![*e]))),
+        ScalarValue::Int8(e) => Ok(Arc::new(Int8Array::from(vec![*e]))),
+        ScalarValue::Int16(e) => Ok(Arc::new(Int16Array::from(vec![*e]))),
+        ScalarValue::Int32(e) => Ok(Arc::new(Int32Array::from(vec![*e]))),
+        ScalarValue::Int64(e) => Ok(Arc::new(Int64Array::from(vec![*e]))),
+        ScalarValue::UInt8(e) => Ok(Arc::new(UInt8Array::from(vec![*e]))),
+        ScalarValue::UInt16(e) => Ok(Arc::new(UInt16Array::from(vec![*e]))),
+        ScalarValue::UInt32(e) => Ok(Arc::new(UInt32Array::from(vec![*e]))),
+        ScalarValue::UInt64(e) => Ok(Arc::new(UInt64Array::from(vec![*e]))),
+        ScalarValue::Utf8(e) => {
+            // awful code...
+            let v = e.as_ref().unwrap_or(&"".to_string()).clone();
+            let v = e.as_ref().and_then(|_| Some(&*v));
+            Ok(Arc::new(StringArray::from(vec![v])))
+        }
+        ScalarValue::LargeUtf8(e) => {
+            // awful code...
+            let v = e.as_ref().unwrap_or(&"".to_string()).clone();
+            let v = e.as_ref().and_then(|_| Some(&*v));
+            Ok(Arc::new(LargeStringArray::from(vec![v])))
+        }
+        ScalarValue::Null => Err(ExecutionError::InternalError(format!(
+            "Cannot convert scalar {:?} to array",
+            value
+        ))),
+        ScalarValue::Struct(_) => Err(ExecutionError::InternalError(format!(
+            "Cannot convert scalar {:?} to array",
+            value
+        ))),
+    }
+}
+
+/// creates an empty record batch.
+pub fn create_batch_empty(schema: &Schema) -> Result<Vec<ArrayRef>> {
+    schema
+        .fields()
+        .iter()
+        .map(|f| match f.data_type() {
+            DataType::Float32 => {
+                Ok(Arc::new(Float32Array::from(vec![] as Vec<f32>)) as 
ArrayRef)
+            }
+            DataType::Float64 => {
+                Ok(Arc::new(Float64Array::from(vec![] as Vec<f64>)) as 
ArrayRef)
+            }
+            DataType::Int64 => {
+                Ok(Arc::new(Int64Array::from(vec![] as Vec<i64>)) as ArrayRef)
+            }
+            DataType::Int32 => {
+                Ok(Arc::new(Int32Array::from(vec![] as Vec<i32>)) as ArrayRef)
+            }
+            DataType::Int16 => {
+                Ok(Arc::new(Int16Array::from(vec![] as Vec<i16>)) as ArrayRef)
+            }
+            DataType::Int8 => {
+                Ok(Arc::new(Int8Array::from(vec![] as Vec<i8>)) as ArrayRef)
+            }
+            DataType::UInt64 => {
+                Ok(Arc::new(UInt64Array::from(vec![] as Vec<u64>)) as ArrayRef)
+            }
+            DataType::UInt32 => {
+                Ok(Arc::new(UInt32Array::from(vec![] as Vec<u32>)) as ArrayRef)
+            }
+            DataType::UInt16 => {
+                Ok(Arc::new(UInt16Array::from(vec![] as Vec<u16>)) as ArrayRef)
+            }
+            DataType::UInt8 => {
+                Ok(Arc::new(UInt8Array::from(vec![] as Vec<u8>)) as ArrayRef)
+            }
+            DataType::Utf8 => {
+                Ok(Arc::new(StringArray::from(vec![] as Vec<&str>)) as 
ArrayRef)
+            }
+            DataType::Boolean => {
+                Ok(Arc::new(BooleanArray::from(vec![] as Vec<bool>)) as 
ArrayRef)
+            }
+            _ => Err(ExecutionError::NotImplemented(format!(

Review comment:
       So far this was enough because we do not have aggregations with other 
types, but this is needed because some batches can have no entries, in which 
case we need to build an empty record batch.

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -97,766 +104,712 @@ pub fn col(name: &str) -> Arc<dyn PhysicalExpr> {
 /// SUM aggregate expression
 #[derive(Debug)]
 pub struct Sum {
+    name: String,

Review comment:
       `AggregateExpr` has this information with them now because it allows 
them to `create_accumulator` without access to the input schema. This is 
helpful because, on the second pass, we need to create accumulators on the fly, 
and the input_schema of the second pass is different, as it now corresponds to 
the schema 
   
   `[group1, group2, agg1_state1, agg1_state2, ...]`

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -97,766 +104,712 @@ pub fn col(name: &str) -> Arc<dyn PhysicalExpr> {
 /// SUM aggregate expression
 #[derive(Debug)]
 pub struct Sum {
+    name: String,
+    data_type: DataType,
     expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
 }
 
-impl Sum {
-    /// Create a new SUM aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
-        Self { expr }
+/// function return type of a sum
+pub fn sum_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 
=> {
+            Ok(DataType::Int64)
+        }
+        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
+            Ok(DataType::UInt64)
+        }
+        DataType::Float32 => Ok(DataType::Float32),
+        DataType::Float64 => Ok(DataType::Float64),
+        other => Err(ExecutionError::General(format!(
+            "SUM does not support type \"{:?}\"",
+            other
+        ))),
     }
 }
 
-impl AggregateExpr for Sum {
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {
-            DataType::Int8 | DataType::Int16 | DataType::Int32 | 
DataType::Int64 => {
-                Ok(DataType::Int64)
-            }
-            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
-                Ok(DataType::UInt64)
-            }
-            DataType::Float32 => Ok(DataType::Float32),
-            DataType::Float64 => Ok(DataType::Float64),
-            other => Err(ExecutionError::General(format!(
-                "SUM does not support {:?}",
-                other
-            ))),
+impl Sum {
+    /// Create a new SUM aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) 
-> Self {
+        Self {
+            name,
+            expr,
+            data_type,
+            nullable: true,
         }
     }
+}
 
-    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        // null should be returned if no rows are aggregated
-        Ok(true)
+impl AggregateExpr for Sum {
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
     }
 
-    fn evaluate_input(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        self.expr.evaluate(batch)
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "sum"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
     }
 
-    fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>> {
-        Rc::new(RefCell::new(SumAccumulator { sum: None }))
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
     }
 
-    fn create_reducer(&self, column_name: &str) -> Arc<dyn AggregateExpr> {
-        Arc::new(Sum::new(Arc::new(Column::new(column_name))))
+    fn create_accumulator(&self) -> Result<Rc<RefCell<dyn Accumulator>>> {
+        Ok(Rc::new(RefCell::new(SumAccumulator::try_new(
+            &self.data_type,
+        )?)))
     }
 }
 
-macro_rules! sum_accumulate {

Review comment:
       This was operating on a row-by-row basis, which was replaced by a batch 
operation using `compute::sum`.

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -97,766 +104,712 @@ pub fn col(name: &str) -> Arc<dyn PhysicalExpr> {
 /// SUM aggregate expression
 #[derive(Debug)]
 pub struct Sum {
+    name: String,
+    data_type: DataType,
     expr: Arc<dyn PhysicalExpr>,
+    nullable: bool,
 }
 
-impl Sum {
-    /// Create a new SUM aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
-        Self { expr }
+/// function return type of a sum
+pub fn sum_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 
=> {
+            Ok(DataType::Int64)
+        }
+        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
+            Ok(DataType::UInt64)
+        }
+        DataType::Float32 => Ok(DataType::Float32),
+        DataType::Float64 => Ok(DataType::Float64),
+        other => Err(ExecutionError::General(format!(
+            "SUM does not support type \"{:?}\"",
+            other
+        ))),
     }
 }
 
-impl AggregateExpr for Sum {
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {
-            DataType::Int8 | DataType::Int16 | DataType::Int32 | 
DataType::Int64 => {
-                Ok(DataType::Int64)
-            }
-            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | 
DataType::UInt64 => {
-                Ok(DataType::UInt64)
-            }
-            DataType::Float32 => Ok(DataType::Float32),
-            DataType::Float64 => Ok(DataType::Float64),
-            other => Err(ExecutionError::General(format!(
-                "SUM does not support {:?}",
-                other
-            ))),
+impl Sum {
+    /// Create a new SUM aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) 
-> Self {
+        Self {
+            name,
+            expr,
+            data_type,
+            nullable: true,
         }
     }
+}
 
-    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        // null should be returned if no rows are aggregated
-        Ok(true)
+impl AggregateExpr for Sum {
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.nullable,
+        ))
     }
 
-    fn evaluate_input(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        self.expr.evaluate(batch)
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "sum"),
+            self.data_type.clone(),
+            self.nullable,
+        )])
     }
 
-    fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>> {
-        Rc::new(RefCell::new(SumAccumulator { sum: None }))
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
     }
 
-    fn create_reducer(&self, column_name: &str) -> Arc<dyn AggregateExpr> {
-        Arc::new(Sum::new(Arc::new(Column::new(column_name))))
+    fn create_accumulator(&self) -> Result<Rc<RefCell<dyn Accumulator>>> {
+        Ok(Rc::new(RefCell::new(SumAccumulator::try_new(
+            &self.data_type,
+        )?)))
     }
 }
 
-macro_rules! sum_accumulate {
-    ($SELF:ident, $VALUE:expr, $ARRAY_TYPE:ident, $SCALAR_VARIANT:ident, 
$TY:ty) => {{
-        $SELF.sum = match $SELF.sum {
-            Some(ScalarValue::$SCALAR_VARIANT(n)) => {
-                Some(ScalarValue::$SCALAR_VARIANT(n + $VALUE as $TY))
-            }
-            Some(_) => {
-                return Err(ExecutionError::InternalError(
-                    "Unexpected ScalarValue variant".to_string(),
-                ))
-            }
-            None => Some(ScalarValue::$SCALAR_VARIANT($VALUE as $TY)),
-        };
-    }};
-}
-
 #[derive(Debug)]
 struct SumAccumulator {
-    sum: Option<ScalarValue>,
+    sum: ScalarValue,
 }
 
-impl Accumulator for SumAccumulator {
-    fn accumulate_scalar(&mut self, value: Option<ScalarValue>) -> Result<()> {
-        if let Some(value) = value {
-            match value {
-                ScalarValue::Int8(value) => {
-                    sum_accumulate!(self, value, Int8Array, Int64, i64);
-                }
-                ScalarValue::Int16(value) => {
-                    sum_accumulate!(self, value, Int16Array, Int64, i64);
-                }
-                ScalarValue::Int32(value) => {
-                    sum_accumulate!(self, value, Int32Array, Int64, i64);
-                }
-                ScalarValue::Int64(value) => {
-                    sum_accumulate!(self, value, Int64Array, Int64, i64);
-                }
-                ScalarValue::UInt8(value) => {
-                    sum_accumulate!(self, value, UInt8Array, UInt64, u64);
-                }
-                ScalarValue::UInt16(value) => {
-                    sum_accumulate!(self, value, UInt16Array, UInt64, u64);
-                }
-                ScalarValue::UInt32(value) => {
-                    sum_accumulate!(self, value, UInt32Array, UInt64, u64);
-                }
-                ScalarValue::UInt64(value) => {
-                    sum_accumulate!(self, value, UInt64Array, UInt64, u64);
-                }
-                ScalarValue::Float32(value) => {
-                    sum_accumulate!(self, value, Float32Array, Float32, f32);
-                }
-                ScalarValue::Float64(value) => {
-                    sum_accumulate!(self, value, Float64Array, Float64, f64);
-                }
-                other => {
-                    return Err(ExecutionError::General(format!(
-                        "SUM does not support {:?}",
-                        other
-                    )))
-                }
-            }
-        }
-        Ok(())
+impl SumAccumulator {
+    /// new sum accumulator
+    pub fn try_new(data_type: &DataType) -> Result<Self> {
+        Ok(Self {
+            sum: ScalarValue::try_from(data_type)?,
+        })
     }
+}
 
-    fn accumulate_batch(&mut self, array: &ArrayRef) -> Result<()> {
-        let sum = match array.data_type() {
-            DataType::UInt8 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt8Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::UInt8(n))),
-                    None => Ok(None),
-                }
+// returns the new value after sum with the new values, taking nullability 
into account
+macro_rules! typed_sum_accumulate {
+    ($OLD_VALUE:expr, $NEW_VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, 
$TYPE:ident) => {{
+        let array = $NEW_VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+        let delta = compute::sum(array);
+        if $OLD_VALUE.is_none() {
+            ScalarValue::$SCALAR(delta.and_then(|e| Some(e as $TYPE)))
+        } else {
+            let delta = delta.and_then(|e| Some(e as $TYPE)).unwrap_or(0 as 
$TYPE);
+            ScalarValue::from($OLD_VALUE.unwrap() + delta)
+        }
+    }};
+}
+
+// given an existing value `old` and an `array` of new values,
+// performs a sum, returning the new value.
+fn sum_accumulate(old: &ScalarValue, array: &ArrayRef) -> Result<ScalarValue> {
+    Ok(match old {
+        ScalarValue::Float64(sum) => match array.data_type() {
+            DataType::Float64 => {
+                typed_sum_accumulate!(sum, array, Float64Array, Float64, f64)
             }
-            DataType::UInt16 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt16Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::UInt16(n))),
-                    None => Ok(None),
-                }
+            DataType::Float32 => {
+                typed_sum_accumulate!(sum, array, Float32Array, Float64, f64)
             }
-            DataType::UInt32 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt32Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::UInt32(n))),
-                    None => Ok(None),
-                }
+            DataType::Int64 => {
+                typed_sum_accumulate!(sum, array, Int64Array, Float64, f64)
+            }
+            DataType::Int32 => {
+                typed_sum_accumulate!(sum, array, Int32Array, Float64, f64)
+            }
+            DataType::Int16 => {
+                typed_sum_accumulate!(sum, array, Int16Array, Float64, f64)
             }
+            DataType::Int8 => typed_sum_accumulate!(sum, array, Int8Array, 
Float64, f64),
             DataType::UInt64 => {
-                match 
compute::sum(array.as_any().downcast_ref::<UInt64Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::UInt64(n))),
-                    None => Ok(None),
-                }
+                typed_sum_accumulate!(sum, array, UInt64Array, Float64, f64)
             }
-            DataType::Int8 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int8Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int8(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt32 => {
+                typed_sum_accumulate!(sum, array, UInt32Array, Float64, f64)
             }
-            DataType::Int16 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int16Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int16(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt16 => {
+                typed_sum_accumulate!(sum, array, UInt16Array, Float64, f64)
             }
-            DataType::Int32 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int32Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int32(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt8 => {
+                typed_sum_accumulate!(sum, array, UInt8Array, Float64, f64)
             }
-            DataType::Int64 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Int64Array>().unwrap()) {
-                    Some(n) => Ok(Some(ScalarValue::Int64(n))),
-                    None => Ok(None),
-                }
+            dt => {
+                return Err(ExecutionError::InternalError(format!(
+                    "Sum f64 does not expect to receive type {:?}",
+                    dt
+                )))
             }
-            DataType::Float32 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Float32Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::Float32(n))),
-                    None => Ok(None),
-                }
+        },
+        ScalarValue::Float32(sum) => {
+            typed_sum_accumulate!(sum, array, Float32Array, Float32, f32)
+        }
+        ScalarValue::UInt64(sum) => match array.data_type() {
+            DataType::UInt64 => {
+                typed_sum_accumulate!(sum, array, UInt64Array, UInt64, u64)
             }
-            DataType::Float64 => {
-                match 
compute::sum(array.as_any().downcast_ref::<Float64Array>().unwrap())
-                {
-                    Some(n) => Ok(Some(ScalarValue::Float64(n))),
-                    None => Ok(None),
-                }
+            DataType::UInt32 => {
+                typed_sum_accumulate!(sum, array, UInt32Array, UInt64, u64)
             }
-            _ => Err(ExecutionError::ExecutionError(
-                "Unsupported data type for SUM".to_string(),
-            )),
-        }?;
-        self.accumulate_scalar(sum)
+            DataType::UInt16 => {
+                typed_sum_accumulate!(sum, array, UInt16Array, UInt64, u64)
+            }
+            DataType::UInt8 => typed_sum_accumulate!(sum, array, UInt8Array, 
UInt64, u64),
+            dt => {
+                return Err(ExecutionError::InternalError(format!(
+                    "Sum is not expected to receive type {:?}",
+                    dt
+                )))
+            }
+        },
+        ScalarValue::Int64(sum) => match array.data_type() {
+            DataType::Int64 => typed_sum_accumulate!(sum, array, Int64Array, 
Int64, i64),
+            DataType::Int32 => typed_sum_accumulate!(sum, array, Int32Array, 
Int64, i64),
+            DataType::Int16 => typed_sum_accumulate!(sum, array, Int16Array, 
Int64, i64),
+            DataType::Int8 => typed_sum_accumulate!(sum, array, Int8Array, 
Int64, i64),
+            dt => {
+                return Err(ExecutionError::InternalError(format!(
+                    "Sum is not expected to receive type {:?}",
+                    dt
+                )))
+            }
+        },
+        e => {
+            return Err(ExecutionError::InternalError(format!(
+                "Sum is not expected to receive a scalar {:?}",
+                e
+            )))
+        }
+    })
+}
+
+impl Accumulator for SumAccumulator {
+    fn update(&mut self, values: &Vec<ArrayRef>) -> Result<()> {
+        // sum(v1, v2, v3) = v1 + v2 + v3
+        self.sum = sum_accumulate(&self.sum, &values[0])?;
+        Ok(())
     }
 
-    fn get_value(&self) -> Result<Option<ScalarValue>> {
-        Ok(self.sum.clone())
+    fn merge(&mut self, states: &Vec<ArrayRef>) -> Result<()> {
+        let state = &states[0];
+        // sum(sum1, sum2, sum3) = sum1 + sum2 + sum3
+        self.sum = sum_accumulate(&self.sum, state)?;
+        Ok(())
     }
-}
 
-/// Create a sum expression
-pub fn sum(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn AggregateExpr> {
-    Arc::new(Sum::new(expr))
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![self.sum.clone()])
+    }
+
+    fn value(&self) -> Result<ScalarValue> {
+        Ok(self.sum.clone())
+    }
 }
 
 /// AVG aggregate expression
 #[derive(Debug)]
 pub struct Avg {
+    name: String,
+    data_type: DataType,
+    nullable: bool,
     expr: Arc<dyn PhysicalExpr>,
 }
 
-impl Avg {
-    /// Create a new AVG aggregate function
-    pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
-        Self { expr }
+/// function return type of an average
+pub fn avg_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(ExecutionError::General(format!(
+            "AVG does not support {:?}",
+            other
+        ))),
     }
 }
 
-impl AggregateExpr for Avg {
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {
-            DataType::Int8
-            | DataType::Int16
-            | DataType::Int32
-            | DataType::Int64
-            | DataType::UInt8
-            | DataType::UInt16
-            | DataType::UInt32
-            | DataType::UInt64
-            | DataType::Float32
-            | DataType::Float64 => Ok(DataType::Float64),
-            other => Err(ExecutionError::General(format!(
-                "AVG does not support {:?}",
-                other
-            ))),
+impl Avg {
+    /// Create a new AVG aggregate function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) 
-> Self {
+        Self {
+            name,
+            expr,
+            data_type,
+            nullable: true,
         }
     }
+}
 
-    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        // null should be returned if no rows are aggregated
-        Ok(true)
+impl AggregateExpr for Avg {
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
     }
 
-    fn evaluate_input(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        self.expr.evaluate(batch)
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "sum"),
+                DataType::Float64,
+                true,
+            ),
+        ])
     }
 
-    fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>> {
-        Rc::new(RefCell::new(AvgAccumulator {
-            sum: None,
-            count: None,
-        }))
+    fn create_accumulator(&self) -> Result<Rc<RefCell<dyn Accumulator>>> {
+        Ok(Rc::new(RefCell::new(AvgAccumulator::try_new(
+            // avg is f64
+            &DataType::Float64,
+        )?)))
     }
 
-    fn create_reducer(&self, column_name: &str) -> Arc<dyn AggregateExpr> {
-        Arc::new(Avg::new(Arc::new(Column::new(column_name))))
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
     }
 }
 
-macro_rules! avg_accumulate {
-    ($SELF:ident, $VALUE:expr, $ARRAY_TYPE:ident) => {{
-        match ($SELF.sum, $SELF.count) {
-            (Some(sum), Some(count)) => {
-                $SELF.sum = Some(sum + $VALUE as f64);
-                $SELF.count = Some(count + 1);
-            }
-            _ => {
-                $SELF.sum = Some($VALUE as f64);
-                $SELF.count = Some(1);
-            }
-        };
-    }};
-}
 #[derive(Debug)]
 struct AvgAccumulator {
-    sum: Option<f64>,
-    count: Option<i64>,
+    // sum is used for null
+    sum: ScalarValue,
+    count: u64,
+}
+
+impl AvgAccumulator {
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
+        Ok(Self {
+            sum: ScalarValue::try_from(datatype)?,
+            count: 0,
+        })
+    }
 }
 
 impl Accumulator for AvgAccumulator {
-    fn accumulate_scalar(&mut self, value: Option<ScalarValue>) -> Result<()> {
-        if let Some(value) = value {
-            match value {
-                ScalarValue::Int8(value) => avg_accumulate!(self, value, 
Int8Array),
-                ScalarValue::Int16(value) => avg_accumulate!(self, value, 
Int16Array),
-                ScalarValue::Int32(value) => avg_accumulate!(self, value, 
Int32Array),
-                ScalarValue::Int64(value) => avg_accumulate!(self, value, 
Int64Array),
-                ScalarValue::UInt8(value) => avg_accumulate!(self, value, 
UInt8Array),
-                ScalarValue::UInt16(value) => avg_accumulate!(self, value, 
UInt16Array),
-                ScalarValue::UInt32(value) => avg_accumulate!(self, value, 
UInt32Array),
-                ScalarValue::UInt64(value) => avg_accumulate!(self, value, 
UInt64Array),
-                ScalarValue::Float32(value) => avg_accumulate!(self, value, 
Float32Array),
-                ScalarValue::Float64(value) => avg_accumulate!(self, value, 
Float64Array),
-                other => {
-                    return Err(ExecutionError::General(format!(
-                        "AVG does not support {:?}",
-                        other
-                    )))
-                }
-            }
-        }
+    fn update(&mut self, values: &Vec<ArrayRef>) -> Result<()> {
+        let values = &values[0];
+
+        self.count += (values.len() - values.data().null_count()) as u64;
+        self.sum = sum_accumulate(&self.sum, values)?;
         Ok(())
     }
 
-    fn accumulate_batch(&mut self, array: &ArrayRef) -> Result<()> {
-        for row in 0..array.len() {
-            self.accumulate_scalar(get_scalar_value(array, row)?)?;
-        }
+    fn merge(&mut self, states: &Vec<ArrayRef>) -> Result<()> {

Review comment:
       This is the prime example of this PR: the merge here uses two states to 
change two states from the accumulator.

##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -1835,88 +1797,18 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn sum_contract() -> Result<()> {

Review comment:
       I removed these because the types are now selected from the signature, 
and no longer inferred by `AggregateExpr`.

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -848,6 +848,24 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn aggregate_grouped_empty() -> Result<()> {

Review comment:
       A test for grouped with an empty result

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -1147,6 +1165,41 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn simple_avg() -> Result<()> {
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+
+        let batch1 = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
+        )?;
+        let batch2 = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(Int32Array::from(vec![4, 5]))],
+        )?;
+
+        let mut ctx = ExecutionContext::new();
+
+        let provider = MemTable::new(Arc::new(schema), vec![vec![batch1], 
vec![batch2]])?;
+        ctx.register_table("t", Box::new(provider));
+
+        let result = collect(&mut ctx, "SELECT AVG(a) FROM t")?;
+
+        let batch = &result[0];
+        assert_eq!(1, batch.num_columns());
+        assert_eq!(1, batch.num_rows());
+
+        let values = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .expect("failed to cast version");
+        assert_eq!(values.len(), 1);
+        // avg(1,2,3,4,5) = 3.0
+        assert_eq!(values.value(0), 3.0_f64);

Review comment:
       This test fails in master, with 3.25 != 3.0

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -276,117 +272,91 @@ impl RecordBatchReader for GroupedHashAggregateIterator {
                 })
                 .collect::<ArrowResult<Vec<_>>>()?;
 
-            // evaluate the inputs to the aggregate expressions for this batch
-            let aggr_input_values = self
-                .aggr_expr
-                .iter()
-                .map(|expr| {
-                    expr.evaluate_input(&batch)
-                        .map_err(ExecutionError::into_arrow_external_error)
-                })
-                .collect::<ArrowResult<Vec<_>>>()?;
+            // evaluate the aggregation expressions. We could evaluate them 
after the `take`, but since
+            // we need to evaluate all of them anyways, it is more performant 
to do it while they are together.
+            let aggr_input_values = evaluate(&expressions, &batch)
+                .map_err(ExecutionError::into_arrow_external_error)?;
 
             // create vector large enough to hold the grouping key
+            // this is an optimization to avoid allocating `key` on every row.
+            // it will be overwritten on the loop below
             let mut key = Vec::with_capacity(group_values.len());
             for _ in 0..group_values.len() {
                 key.push(GroupByScalar::UInt32(0));
             }
 
-            // iterate over each row in the batch and create the accumulators 
for each grouping key
-            let mut accums: Vec<Rc<AccumulatorSet>> =
-                Vec::with_capacity(batch.num_rows());
-
+            // 1.1 construct the key from the group values
+            // 1.2 construct/update the mapping key -> indexes (on the batch) 
used to `take` values from the batch in a single operation

Review comment:
       wrong comment, 1.2 and 1.3 are together.

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -709,64 +659,111 @@ fn create_key(
 #[cfg(test)]
 mod tests {
 
+    use arrow::array::Float64Array;
+
     use super::*;
-    use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
-    use crate::physical_plan::expressions::{col, sum};
+    use crate::physical_plan::expressions::{col, Avg};
     use crate::physical_plan::merge::MergeExec;
-    use crate::test;
+    use crate::physical_plan::{common, memory::MemoryExec};
+
+    fn some_data() -> ArrowResult<(Arc<Schema>, Vec<RecordBatch>)> {

Review comment:
       I placed controlled data here so that we cab easily check the result

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -218,32 +217,18 @@ impl DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
                 let aggregates = aggr_expr
                     .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_aggregate_expr(e, &input_schema, 
ctx_state),
-                            e.name(&input_schema),
-                        ))
-                    })
+                    .map(|e| self.create_aggregate_expr(e, &input_schema, 
ctx_state))
                     .collect::<Result<Vec<_>>>()?;
 
-                let initial_aggr = HashAggregateExec::try_new(
+                let initial_aggr = Arc::new(HashAggregateExec::try_new(
                     AggregateMode::Partial,
                     groups.clone(),
                     aggregates.clone(),
                     input,
-                )?;
+                )?);
 
-                if initial_aggr.output_partitioning().partition_count() == 1 {

Review comment:
       This seems major for a single partition.

##########
File path: rust/datafusion/src/scalar.rs
##########
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides ScalarValue, an enum that can be used for storage of 
single elements
+
+use std::{convert::TryFrom, fmt};
+
+use arrow::datatypes::DataType;
+
+use crate::error::{ExecutionError, Result};
+
+/// ScalarValue enumeration
+#[derive(Clone, PartialEq)]
+pub enum ScalarValue {
+    /// null value
+    Null,

Review comment:
       no longer needed?

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -218,32 +217,18 @@ impl DefaultPhysicalPlanner {
                     .collect::<Result<Vec<_>>>()?;
                 let aggregates = aggr_expr
                     .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_aggregate_expr(e, &input_schema, 
ctx_state),
-                            e.name(&input_schema),
-                        ))
-                    })
+                    .map(|e| self.create_aggregate_expr(e, &input_schema, 
ctx_state))
                     .collect::<Result<Vec<_>>>()?;
 
-                let initial_aggr = HashAggregateExec::try_new(
+                let initial_aggr = Arc::new(HashAggregateExec::try_new(
                     AggregateMode::Partial,
                     groups.clone(),
                     aggregates.clone(),
                     input,
-                )?;
+                )?);
 
-                if initial_aggr.output_partitioning().partition_count() == 1 {

Review comment:
       Sorry. What I meant is that compared to the previous implementation, we 
double the amount of work, even when we did not need because there is a single 
partition.
   
   IMO we should handle this in another way, e.g. via a physical optimizer that 
re-writes the partial aggregation by a final aggregation when the input is a 
single partition.

##########
File path: rust/arrow/src/compute/kernels/aggregate.rs
##########
@@ -19,9 +19,42 @@
 
 use std::ops::Add;
 
-use crate::array::{Array, PrimitiveArray};
+use crate::array::{Array, LargeStringArray, PrimitiveArray, StringArray};
 use crate::datatypes::ArrowNumericType;
 
+/// Helper macro to perform min/max of strings
+macro_rules! min_max_string_helper {
+    ($array:expr, $cmp:tt) => {{
+        let null_count = $array.null_count();
+
+        if null_count == $array.len() {
+            return None
+        }
+        let mut n = "";
+        let mut has_value = false;
+        let data = $array.data();
+
+        if null_count == 0 {

Review comment:
       yes. Generally, operations on non-null fields are faster because there 
isn't an unpredictable branch on the loop. This check removes that 
unpredictable branch altogether when there are no nulls.
   
   We could probably still get some more juice by using some of the vertical 
operations supported by packed_simd.

##########
File path: rust/datafusion/src/physical_plan/aggregates.rs
##########
@@ -103,42 +103,54 @@ pub fn create_aggregate_expr(
     fun: &AggregateFunction,
     args: &Vec<Arc<dyn PhysicalExpr>>,
     input_schema: &Schema,
+    name: String,
 ) -> Result<Arc<dyn AggregateExpr>> {
     // coerce
     let arg = coerce(args, input_schema, &signature(fun))?[0].clone();
 
+    let arg_types = args
+        .iter()
+        .map(|e| e.data_type(input_schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    let return_type = return_type(&fun, &arg_types)?;
+
     Ok(match fun {
-        AggregateFunction::Count => expressions::count(arg),
-        AggregateFunction::Sum => expressions::sum(arg),
-        AggregateFunction::Min => expressions::min(arg),
-        AggregateFunction::Max => expressions::max(arg),
-        AggregateFunction::Avg => expressions::avg(arg),
+        AggregateFunction::Count => {
+            Arc::new(expressions::Count::new(arg, name, return_type))
+        }
+        AggregateFunction::Sum => Arc::new(expressions::Sum::new(arg, name, 
return_type)),
+        AggregateFunction::Min => Arc::new(expressions::Min::new(arg, name, 
return_type)),
+        AggregateFunction::Max => Arc::new(expressions::Max::new(arg, name, 
return_type)),
+        AggregateFunction::Avg => Arc::new(expressions::Avg::new(arg, name, 
return_type)),
     })
 }
 
+static NUMERICS: &'static [DataType] = &[
+    DataType::Int8,
+    DataType::Int16,
+    DataType::Int32,
+    DataType::Int64,
+    DataType::UInt8,
+    DataType::UInt16,
+    DataType::UInt32,
+    DataType::UInt64,
+    DataType::Float32,
+    DataType::Float64,
+];
+
 /// the signatures supported by the function `fun`.
 fn signature(fun: &AggregateFunction) -> Signature {
     // note: the physical expression must accept the type returned by this 
function or the execution panics.
-
     match fun {
         AggregateFunction::Count => Signature::Any(1),
-        AggregateFunction::Min
-        | AggregateFunction::Max
-        | AggregateFunction::Avg
-        | AggregateFunction::Sum => Signature::Uniform(
-            1,
-            vec![
-                DataType::Int8,
-                DataType::Int16,
-                DataType::Int32,
-                DataType::Int64,
-                DataType::UInt8,
-                DataType::UInt16,
-                DataType::UInt32,
-                DataType::UInt64,
-                DataType::Float32,
-                DataType::Float64,
-            ],
-        ),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            let mut valid = vec![DataType::Utf8, DataType::LargeUtf8];
+            valid.extend_from_slice(NUMERICS);

Review comment:
       I have not though about that, but that is an interesting idea 👍 
   
   In this PR, `max` continues to only support a single column, which we select 
in [this 
line](https://github.com/apache/arrow/pull/8172/files#diff-a98d5d588d3c5b525c6840271a5bdddcR571).
   
   This PR does enable us to create aggregate functions with more than one 
argument, and therefore this allows that option if we wish so. My initial 
thinking was supporting aggregate functions of more arguments just to support 
things like `covariance` and `correlation`, but now that you mention, we can do 
a lot of other things also. Another one is count distinct over N columns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to