This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 41d2ff2aa Make PhysicalAggregateExprNode has repeated PhysicalExprNode 
(#2184)
41d2ff2aa is described below

commit 41d2ff2aaefb5145f27ce20cec83d884411ce866
Author: Yang Jiang <[email protected]>
AuthorDate: Sat Apr 9 22:50:56 2022 +0800

    Make PhysicalAggregateExprNode has repeated PhysicalExprNode (#2184)
---
 ballista/rust/client/src/context.rs                | 48 ++++++++++++++++++++++
 ballista/rust/core/proto/ballista.proto            |  2 +-
 ballista/rust/core/src/serde/physical_plan/mod.rs  |  8 ++--
 .../rust/core/src/serde/physical_plan/to_proto.rs  |  6 +--
 .../src/expressions/approx_percentile_cont.rs      | 11 ++---
 5 files changed, 58 insertions(+), 17 deletions(-)

diff --git a/ballista/rust/client/src/context.rs 
b/ballista/rust/client/src/context.rs
index 3a8a68505..5899598ba 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -693,4 +693,52 @@ mod tests {
                 .collect::<Vec<&str>>()
         );
     }
+
+    #[tokio::test]
+    #[cfg(feature = "standalone")]
+    async fn test_percentile_func() {
+        use crate::context::BallistaContext;
+        use ballista_core::config::{
+            BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
+        };
+        use datafusion::arrow::util::pretty::pretty_format_batches;
+        use datafusion::prelude::ParquetReadOptions;
+        let config = BallistaConfigBuilder::default()
+            .set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
+            .build()
+            .unwrap();
+        let context = BallistaContext::standalone(&config, 1).await.unwrap();
+
+        let testdata = datafusion::test_util::parquet_test_data();
+        context
+            .register_parquet(
+                "test",
+                &format!("{}/alltypes_plain.parquet", testdata),
+                ParquetReadOptions::default(),
+            )
+            .await
+            .unwrap();
+        let df = context
+            .sql("select approx_percentile_cont(\"double_col\", 0.5) from 
test")
+            .await
+            .unwrap();
+        let res = df.collect().await.unwrap();
+        let expected = vec![
+            "+----------------------------------------------------+",
+            "| APPROXPERCENTILECONT(test.double_col,Float64(0.5)) |",
+            "+----------------------------------------------------+",
+            "| 7.574999999999999                                  |",
+            "+----------------------------------------------------+",
+        ];
+
+        assert_eq!(
+            expected,
+            pretty_format_batches(&*res)
+                .unwrap()
+                .to_string()
+                .trim()
+                .lines()
+                .collect::<Vec<&str>>()
+        );
+    }
 }
diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 2f69d4a26..87c6ccabb 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -320,7 +320,7 @@ message PhysicalScalarUdfNode {
 
 message PhysicalAggregateExprNode {
   datafusion.AggregateFunction aggr_function = 1;
-  PhysicalExprNode expr = 2;
+  repeated PhysicalExprNode expr = 2;
 }
 
 message PhysicalWindowExprNode {
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs 
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 32eda9f3a..955725957 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -372,15 +372,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
                                             },
                                         )?;
 
-                                let input_phy_expr = agg_node.expr.as_ref()
-                                    .map(|e| parse_physical_expr(e.as_ref(), 
registry))
-                                    .transpose()?
-                                    .ok_or_else(|| proto_error("missing 
aggregate expression".to_string()))?;
+                                let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> 
= agg_node.expr.iter()
+                                    .map(|e| parse_physical_expr(e, 
registry).unwrap()).collect();
 
                                 Ok(create_aggregate_expr(
                                     &aggr_function.into(),
                                     false,
-                                    &[input_phy_expr],
+                                    input_phy_expr.as_slice(),
                                     &physical_schema,
                                     name.to_string(),
                                 )?)
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 1a1276ec1..5b9fb1b25 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -142,10 +142,10 @@ impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn 
AggregateExpr> {
             .collect::<Result<Vec<_>, BallistaError>>()?;
         Ok(protobuf::PhysicalExprNode {
             expr_type: 
Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
-                Box::new(protobuf::PhysicalAggregateExprNode {
+                protobuf::PhysicalAggregateExprNode {
                     aggr_function,
-                    expr: Some(Box::new(expressions[0].clone())),
-                }),
+                    expr: expressions,
+                },
             )),
         })
     }
diff --git a/datafusion/physical-expr/src/expressions/approx_percentile_cont.rs 
b/datafusion/physical-expr/src/expressions/approx_percentile_cont.rs
index 395a43f5b..36a57093a 100644
--- a/datafusion/physical-expr/src/expressions/approx_percentile_cont.rs
+++ b/datafusion/physical-expr/src/expressions/approx_percentile_cont.rs
@@ -58,7 +58,7 @@ pub fn is_approx_percentile_cont_supported_arg_type(arg_type: 
&DataType) -> bool
 pub struct ApproxPercentileCont {
     name: String,
     input_data_type: DataType,
-    expr: Arc<dyn PhysicalExpr>,
+    expr: Vec<Arc<dyn PhysicalExpr>>,
     percentile: f64,
 }
 
@@ -103,7 +103,7 @@ impl ApproxPercentileCont {
             name: name.into(),
             input_data_type,
             // The physical expr to evaluate during accumulation
-            expr: expr[0].clone(),
+            expr,
             percentile,
         })
     }
@@ -181,7 +181,7 @@ impl AggregateExpr for ApproxPercentileCont {
     }
 
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
+        self.expr.clone()
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
@@ -314,11 +314,6 @@ impl Accumulator for ApproxPercentileAccumulator {
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        debug_assert_eq!(
-            values.len(),
-            1,
-            "invalid number of values in batch percentile update"
-        );
         let values = &values[0];
         let unsorted_values =
             ApproxPercentileAccumulator::convert_to_ordered_float(values)?;

Reply via email to