This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 41d2ff2aa Make PhysicalAggregateExprNode has repeated PhysicalExprNode
(#2184)
41d2ff2aa is described below
commit 41d2ff2aaefb5145f27ce20cec83d884411ce866
Author: Yang Jiang <[email protected]>
AuthorDate: Sat Apr 9 22:50:56 2022 +0800
Make PhysicalAggregateExprNode has repeated PhysicalExprNode (#2184)
---
ballista/rust/client/src/context.rs | 48 ++++++++++++++++++++++
ballista/rust/core/proto/ballista.proto | 2 +-
ballista/rust/core/src/serde/physical_plan/mod.rs | 8 ++--
.../rust/core/src/serde/physical_plan/to_proto.rs | 6 +--
.../src/expressions/approx_percentile_cont.rs | 11 ++---
5 files changed, 58 insertions(+), 17 deletions(-)
diff --git a/ballista/rust/client/src/context.rs
b/ballista/rust/client/src/context.rs
index 3a8a68505..5899598ba 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -693,4 +693,52 @@ mod tests {
.collect::<Vec<&str>>()
);
}
+
+ #[tokio::test]
+ #[cfg(feature = "standalone")]
+ async fn test_percentile_func() {
+ use crate::context::BallistaContext;
+ use ballista_core::config::{
+ BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
+ };
+ use datafusion::arrow::util::pretty::pretty_format_batches;
+ use datafusion::prelude::ParquetReadOptions;
+ let config = BallistaConfigBuilder::default()
+ .set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
+ .build()
+ .unwrap();
+ let context = BallistaContext::standalone(&config, 1).await.unwrap();
+
+ let testdata = datafusion::test_util::parquet_test_data();
+ context
+ .register_parquet(
+ "test",
+ &format!("{}/alltypes_plain.parquet", testdata),
+ ParquetReadOptions::default(),
+ )
+ .await
+ .unwrap();
+ let df = context
+ .sql("select approx_percentile_cont(\"double_col\", 0.5) from
test")
+ .await
+ .unwrap();
+ let res = df.collect().await.unwrap();
+ let expected = vec![
+ "+----------------------------------------------------+",
+ "| APPROXPERCENTILECONT(test.double_col,Float64(0.5)) |",
+ "+----------------------------------------------------+",
+ "| 7.574999999999999 |",
+ "+----------------------------------------------------+",
+ ];
+
+ assert_eq!(
+ expected,
+ pretty_format_batches(&*res)
+ .unwrap()
+ .to_string()
+ .trim()
+ .lines()
+ .collect::<Vec<&str>>()
+ );
+ }
}
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 2f69d4a26..87c6ccabb 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -320,7 +320,7 @@ message PhysicalScalarUdfNode {
message PhysicalAggregateExprNode {
datafusion.AggregateFunction aggr_function = 1;
- PhysicalExprNode expr = 2;
+ repeated PhysicalExprNode expr = 2;
}
message PhysicalWindowExprNode {
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 32eda9f3a..955725957 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -372,15 +372,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
},
)?;
- let input_phy_expr = agg_node.expr.as_ref()
- .map(|e| parse_physical_expr(e.as_ref(),
registry))
- .transpose()?
- .ok_or_else(|| proto_error("missing
aggregate expression".to_string()))?;
+ let input_phy_expr: Vec<Arc<dyn PhysicalExpr>>
= agg_node.expr.iter()
+ .map(|e| parse_physical_expr(e,
registry).unwrap()).collect();
Ok(create_aggregate_expr(
&aggr_function.into(),
false,
- &[input_phy_expr],
+ input_phy_expr.as_slice(),
&physical_schema,
name.to_string(),
)?)
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 1a1276ec1..5b9fb1b25 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -142,10 +142,10 @@ impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn
AggregateExpr> {
.collect::<Result<Vec<_>, BallistaError>>()?;
Ok(protobuf::PhysicalExprNode {
expr_type:
Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
- Box::new(protobuf::PhysicalAggregateExprNode {
+ protobuf::PhysicalAggregateExprNode {
aggr_function,
- expr: Some(Box::new(expressions[0].clone())),
- }),
+ expr: expressions,
+ },
)),
})
}
diff --git a/datafusion/physical-expr/src/expressions/approx_percentile_cont.rs
b/datafusion/physical-expr/src/expressions/approx_percentile_cont.rs
index 395a43f5b..36a57093a 100644
--- a/datafusion/physical-expr/src/expressions/approx_percentile_cont.rs
+++ b/datafusion/physical-expr/src/expressions/approx_percentile_cont.rs
@@ -58,7 +58,7 @@ pub fn is_approx_percentile_cont_supported_arg_type(arg_type:
&DataType) -> bool
pub struct ApproxPercentileCont {
name: String,
input_data_type: DataType,
- expr: Arc<dyn PhysicalExpr>,
+ expr: Vec<Arc<dyn PhysicalExpr>>,
percentile: f64,
}
@@ -103,7 +103,7 @@ impl ApproxPercentileCont {
name: name.into(),
input_data_type,
// The physical expr to evaluate during accumulation
- expr: expr[0].clone(),
+ expr,
percentile,
})
}
@@ -181,7 +181,7 @@ impl AggregateExpr for ApproxPercentileCont {
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
+ self.expr.clone()
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
@@ -314,11 +314,6 @@ impl Accumulator for ApproxPercentileAccumulator {
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- debug_assert_eq!(
- values.len(),
- 1,
- "invalid number of values in batch percentile update"
- );
let values = &values[0];
let unsorted_values =
ApproxPercentileAccumulator::convert_to_ordered_float(values)?;