This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 63ba5b6cd0 Consolidate Example: simplify_udwf_expression.rs into
advanced_udwf.rs (#13883)
63ba5b6cd0 is described below
commit 63ba5b6cd0f06398b7ca4b235ea7ef66cece3c44
Author: Jack Park <[email protected]>
AuthorDate: Mon Dec 23 13:28:15 2024 -0800
Consolidate Example: simplify_udwf_expression.rs into advanced_udwf.rs
(#13883)
---
datafusion-examples/examples/advanced_udwf.rs | 94 ++++++++++++++-
.../examples/simplify_udwf_expression.rs | 133 ---------------------
datafusion/expr/src/udwf.rs | 2 +-
3 files changed, 90 insertions(+), 139 deletions(-)
diff --git a/datafusion-examples/examples/advanced_udwf.rs
b/datafusion-examples/examples/advanced_udwf.rs
index 1c20e292f0..49e890467d 100644
--- a/datafusion-examples/examples/advanced_udwf.rs
+++ b/datafusion-examples/examples/advanced_udwf.rs
@@ -24,11 +24,14 @@ use arrow::{
};
use arrow_schema::Field;
use datafusion::error::Result;
+use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::prelude::*;
use datafusion_common::ScalarValue;
-use datafusion_expr::function::WindowUDFFieldArgs;
+use datafusion_expr::expr::WindowFunction;
+use datafusion_expr::function::{WindowFunctionSimplification,
WindowUDFFieldArgs};
+use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::{
- PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
+ Expr, PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
};
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
@@ -142,6 +145,67 @@ impl PartitionEvaluator for MyPartitionEvaluator {
}
}
+/// This UDWF will show how to use the WindowUDFImpl::simplify() API
+#[derive(Debug, Clone)]
+struct SimplifySmoothItUdf {
+ signature: Signature,
+}
+
+impl SimplifySmoothItUdf {
+ fn new() -> Self {
+ Self {
+ signature: Signature::exact(
+ // this function will always take one arguments of type f64
+ vec![DataType::Float64],
+ // this function is deterministic and will always return the
same
+ // result for the same input
+ Volatility::Immutable,
+ ),
+ }
+ }
+}
+impl WindowUDFImpl for SimplifySmoothItUdf {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "simplify_smooth_it"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn partition_evaluator(
+ &self,
+ _partition_evaluator_args: PartitionEvaluatorArgs,
+ ) -> Result<Box<dyn PartitionEvaluator>> {
+ todo!()
+ }
+
+ /// this function will simplify `SimplifySmoothItUdf` to `AggregateUDF`
for `Avg`
+ /// default implementation will not be called (left as `todo!()`)
+ fn simplify(&self) -> Option<WindowFunctionSimplification> {
+ let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo|
{
+ Ok(Expr::WindowFunction(WindowFunction {
+ fun:
datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
+ args: window_function.args,
+ partition_by: window_function.partition_by,
+ order_by: window_function.order_by,
+ window_frame: window_function.window_frame,
+ null_treatment: window_function.null_treatment,
+ }))
+ };
+
+ Some(Box::new(simplify))
+ }
+
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ Ok(Field::new(field_args.name(), DataType::Float64, true))
+ }
+}
+
// create local execution context with `cars.csv` registered as a table named
`cars`
async fn create_context() -> Result<SessionContext> {
// declare a new context. In spark API, this corresponds to a new spark
SQL session
@@ -162,12 +226,15 @@ async fn main() -> Result<()> {
let smooth_it = WindowUDF::from(SmoothItUdf::new());
ctx.register_udwf(smooth_it.clone());
- // Use SQL to run the new window function
+ let simplify_smooth_it = WindowUDF::from(SimplifySmoothItUdf::new());
+ ctx.register_udwf(simplify_smooth_it.clone());
+
+ // Use SQL to retrieve entire table
let df = ctx.sql("SELECT * from cars").await?;
// print the results
df.show().await?;
- // Use SQL to run the new window function:
+ // Use SQL to run smooth_it:
//
// `PARTITION BY car`:each distinct value of car (red, and green)
// should be treated as a separate partition (and will result in
@@ -201,7 +268,7 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;
- // this time, call the new widow function with an explicit
+ // this time, call the function with an explicit
// window so evaluate will be invoked with each window.
//
// `ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING`: each invocation
@@ -232,5 +299,22 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;
+ // Use SQL to run simplify_smooth_it
+ let df = ctx
+ .sql(
+ "SELECT \
+ car, \
+ speed, \
+ simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time)
AS smooth_speed,\
+ time \
+ from cars \
+ ORDER BY \
+ car",
+ )
+ .await?;
+
+ // print the results
+ df.show().await?;
+
Ok(())
}
diff --git a/datafusion-examples/examples/simplify_udwf_expression.rs
b/datafusion-examples/examples/simplify_udwf_expression.rs
deleted file mode 100644
index 117063df4e..0000000000
--- a/datafusion-examples/examples/simplify_udwf_expression.rs
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::any::Any;
-
-use arrow_schema::{DataType, Field};
-
-use datafusion::execution::context::SessionContext;
-use datafusion::functions_aggregate::average::avg_udaf;
-use datafusion::{error::Result, execution::options::CsvReadOptions};
-use datafusion_expr::function::{WindowFunctionSimplification,
WindowUDFFieldArgs};
-use datafusion_expr::{
- expr::WindowFunction, simplify::SimplifyInfo, Expr, PartitionEvaluator,
Signature,
- Volatility, WindowUDF, WindowUDFImpl,
-};
-use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
-
-/// This UDWF will show how to use the WindowUDFImpl::simplify() API
-#[derive(Debug, Clone)]
-struct SimplifySmoothItUdf {
- signature: Signature,
-}
-
-impl SimplifySmoothItUdf {
- fn new() -> Self {
- Self {
- signature: Signature::exact(
- // this function will always take one arguments of type f64
- vec![DataType::Float64],
- // this function is deterministic and will always return the
same
- // result for the same input
- Volatility::Immutable,
- ),
- }
- }
-}
-impl WindowUDFImpl for SimplifySmoothItUdf {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn name(&self) -> &str {
- "simplify_smooth_it"
- }
-
- fn signature(&self) -> &Signature {
- &self.signature
- }
-
- fn partition_evaluator(
- &self,
- _partition_evaluator_args: PartitionEvaluatorArgs,
- ) -> Result<Box<dyn PartitionEvaluator>> {
- todo!()
- }
-
- /// this function will simplify `SimplifySmoothItUdf` to `SmoothItUdf`.
- fn simplify(&self) -> Option<WindowFunctionSimplification> {
- let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo|
{
- Ok(Expr::WindowFunction(WindowFunction {
- fun:
datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
- args: window_function.args,
- partition_by: window_function.partition_by,
- order_by: window_function.order_by,
- window_frame: window_function.window_frame,
- null_treatment: window_function.null_treatment,
- }))
- };
-
- Some(Box::new(simplify))
- }
-
- fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
- Ok(Field::new(field_args.name(), DataType::Float64, true))
- }
-}
-
-// create local execution context with `cars.csv` registered as a table named
`cars`
-async fn create_context() -> Result<SessionContext> {
- // declare a new context. In spark API, this corresponds to a new spark
SQL session
- let ctx = SessionContext::new();
-
- // declare a table in memory. In spark API, this corresponds to
createDataFrame(...).
- println!("pwd: {}", std::env::current_dir().unwrap().display());
- let csv_path = "../../datafusion/core/tests/data/cars.csv".to_string();
- let read_options = CsvReadOptions::default().has_header(true);
-
- ctx.register_csv("cars", &csv_path, read_options).await?;
- Ok(ctx)
-}
-
-#[tokio::main]
-async fn main() -> Result<()> {
- let ctx = create_context().await?;
- let simplify_smooth_it = WindowUDF::from(SimplifySmoothItUdf::new());
- ctx.register_udwf(simplify_smooth_it.clone());
-
- // Use SQL to run the new window function
- let df = ctx.sql("SELECT * from cars").await?;
- // print the results
- df.show().await?;
-
- let df = ctx
- .sql(
- "SELECT \
- car, \
- speed, \
- simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time)
AS smooth_speed,\
- time \
- from cars \
- ORDER BY \
- car",
- )
- .await?;
- // print the results
- df.show().await?;
-
- Ok(())
-}
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index 4bfc3f07bb..39e1e8f261 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -344,7 +344,7 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
/// optimizations manually for specific UDFs.
///
/// Example:
- /// [`simplify_udwf_expression.rs`]:
<https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simplify_udwf_expression.rs>
+ /// [`advanced_udwf.rs`]:
<https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udwf.rs>
///
/// # Returns
/// [None] if simplify is not defined or,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]