This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 63ba5b6cd0 Consolidate Example: simplify_udwf_expression.rs into 
advanced_udwf.rs (#13883)
63ba5b6cd0 is described below

commit 63ba5b6cd0f06398b7ca4b235ea7ef66cece3c44
Author: Jack Park <[email protected]>
AuthorDate: Mon Dec 23 13:28:15 2024 -0800

    Consolidate Example: simplify_udwf_expression.rs into advanced_udwf.rs 
(#13883)
---
 datafusion-examples/examples/advanced_udwf.rs      |  94 ++++++++++++++-
 .../examples/simplify_udwf_expression.rs           | 133 ---------------------
 datafusion/expr/src/udwf.rs                        |   2 +-
 3 files changed, 90 insertions(+), 139 deletions(-)

diff --git a/datafusion-examples/examples/advanced_udwf.rs 
b/datafusion-examples/examples/advanced_udwf.rs
index 1c20e292f0..49e890467d 100644
--- a/datafusion-examples/examples/advanced_udwf.rs
+++ b/datafusion-examples/examples/advanced_udwf.rs
@@ -24,11 +24,14 @@ use arrow::{
 };
 use arrow_schema::Field;
 use datafusion::error::Result;
+use datafusion::functions_aggregate::average::avg_udaf;
 use datafusion::prelude::*;
 use datafusion_common::ScalarValue;
-use datafusion_expr::function::WindowUDFFieldArgs;
+use datafusion_expr::expr::WindowFunction;
+use datafusion_expr::function::{WindowFunctionSimplification, 
WindowUDFFieldArgs};
+use datafusion_expr::simplify::SimplifyInfo;
 use datafusion_expr::{
-    PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
+    Expr, PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
 };
 use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
 
@@ -142,6 +145,67 @@ impl PartitionEvaluator for MyPartitionEvaluator {
     }
 }
 
+/// This UDWF will show how to use the WindowUDFImpl::simplify() API
+#[derive(Debug, Clone)]
+struct SimplifySmoothItUdf {
+    signature: Signature,
+}
+
+impl SimplifySmoothItUdf {
+    fn new() -> Self {
+        Self {
+            signature: Signature::exact(
+                // this function will always take one arguments of type f64
+                vec![DataType::Float64],
+                // this function is deterministic and will always return the 
same
+                // result for the same input
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+impl WindowUDFImpl for SimplifySmoothItUdf {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "simplify_smooth_it"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn partition_evaluator(
+        &self,
+        _partition_evaluator_args: PartitionEvaluatorArgs,
+    ) -> Result<Box<dyn PartitionEvaluator>> {
+        todo!()
+    }
+
+    /// this function will simplify `SimplifySmoothItUdf` to `AggregateUDF` 
for `Avg`
+    /// default implementation will not be called (left as `todo!()`)
+    fn simplify(&self) -> Option<WindowFunctionSimplification> {
+        let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| 
{
+            Ok(Expr::WindowFunction(WindowFunction {
+                fun: 
datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
+                args: window_function.args,
+                partition_by: window_function.partition_by,
+                order_by: window_function.order_by,
+                window_frame: window_function.window_frame,
+                null_treatment: window_function.null_treatment,
+            }))
+        };
+
+        Some(Box::new(simplify))
+    }
+
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        Ok(Field::new(field_args.name(), DataType::Float64, true))
+    }
+}
+
 // create local execution context with `cars.csv` registered as a table named 
`cars`
 async fn create_context() -> Result<SessionContext> {
     // declare a new context. In spark API, this corresponds to a new spark 
SQL session
@@ -162,12 +226,15 @@ async fn main() -> Result<()> {
     let smooth_it = WindowUDF::from(SmoothItUdf::new());
     ctx.register_udwf(smooth_it.clone());
 
-    // Use SQL to run the new window function
+    let simplify_smooth_it = WindowUDF::from(SimplifySmoothItUdf::new());
+    ctx.register_udwf(simplify_smooth_it.clone());
+
+    // Use SQL to retrieve entire table
     let df = ctx.sql("SELECT * from cars").await?;
     // print the results
     df.show().await?;
 
-    // Use SQL to run the new window function:
+    // Use SQL to run smooth_it:
     //
     // `PARTITION BY car`:each distinct value of car (red, and green)
     // should be treated as a separate partition (and will result in
@@ -201,7 +268,7 @@ async fn main() -> Result<()> {
     // print the results
     df.show().await?;
 
-    // this time, call the new widow function with an explicit
+    // this time, call the function with an explicit
     // window so evaluate will be invoked with each window.
     //
     // `ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING`: each invocation
@@ -232,5 +299,22 @@ async fn main() -> Result<()> {
     // print the results
     df.show().await?;
 
+    // Use SQL to run simplify_smooth_it
+    let df = ctx
+        .sql(
+            "SELECT \
+               car, \
+               speed, \
+               simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time) 
AS smooth_speed,\
+               time \
+               from cars \
+             ORDER BY \
+               car",
+        )
+        .await?;
+
+    // print the results
+    df.show().await?;
+
     Ok(())
 }
diff --git a/datafusion-examples/examples/simplify_udwf_expression.rs 
b/datafusion-examples/examples/simplify_udwf_expression.rs
deleted file mode 100644
index 117063df4e..0000000000
--- a/datafusion-examples/examples/simplify_udwf_expression.rs
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::any::Any;
-
-use arrow_schema::{DataType, Field};
-
-use datafusion::execution::context::SessionContext;
-use datafusion::functions_aggregate::average::avg_udaf;
-use datafusion::{error::Result, execution::options::CsvReadOptions};
-use datafusion_expr::function::{WindowFunctionSimplification, 
WindowUDFFieldArgs};
-use datafusion_expr::{
-    expr::WindowFunction, simplify::SimplifyInfo, Expr, PartitionEvaluator, 
Signature,
-    Volatility, WindowUDF, WindowUDFImpl,
-};
-use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
-
-/// This UDWF will show how to use the WindowUDFImpl::simplify() API
-#[derive(Debug, Clone)]
-struct SimplifySmoothItUdf {
-    signature: Signature,
-}
-
-impl SimplifySmoothItUdf {
-    fn new() -> Self {
-        Self {
-            signature: Signature::exact(
-                // this function will always take one arguments of type f64
-                vec![DataType::Float64],
-                // this function is deterministic and will always return the 
same
-                // result for the same input
-                Volatility::Immutable,
-            ),
-        }
-    }
-}
-impl WindowUDFImpl for SimplifySmoothItUdf {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn name(&self) -> &str {
-        "simplify_smooth_it"
-    }
-
-    fn signature(&self) -> &Signature {
-        &self.signature
-    }
-
-    fn partition_evaluator(
-        &self,
-        _partition_evaluator_args: PartitionEvaluatorArgs,
-    ) -> Result<Box<dyn PartitionEvaluator>> {
-        todo!()
-    }
-
-    /// this function will simplify `SimplifySmoothItUdf` to `SmoothItUdf`.
-    fn simplify(&self) -> Option<WindowFunctionSimplification> {
-        let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| 
{
-            Ok(Expr::WindowFunction(WindowFunction {
-                fun: 
datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
-                args: window_function.args,
-                partition_by: window_function.partition_by,
-                order_by: window_function.order_by,
-                window_frame: window_function.window_frame,
-                null_treatment: window_function.null_treatment,
-            }))
-        };
-
-        Some(Box::new(simplify))
-    }
-
-    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
-        Ok(Field::new(field_args.name(), DataType::Float64, true))
-    }
-}
-
-// create local execution context with `cars.csv` registered as a table named 
`cars`
-async fn create_context() -> Result<SessionContext> {
-    // declare a new context. In spark API, this corresponds to a new spark 
SQL session
-    let ctx = SessionContext::new();
-
-    // declare a table in memory. In spark API, this corresponds to 
createDataFrame(...).
-    println!("pwd: {}", std::env::current_dir().unwrap().display());
-    let csv_path = "../../datafusion/core/tests/data/cars.csv".to_string();
-    let read_options = CsvReadOptions::default().has_header(true);
-
-    ctx.register_csv("cars", &csv_path, read_options).await?;
-    Ok(ctx)
-}
-
-#[tokio::main]
-async fn main() -> Result<()> {
-    let ctx = create_context().await?;
-    let simplify_smooth_it = WindowUDF::from(SimplifySmoothItUdf::new());
-    ctx.register_udwf(simplify_smooth_it.clone());
-
-    // Use SQL to run the new window function
-    let df = ctx.sql("SELECT * from cars").await?;
-    // print the results
-    df.show().await?;
-
-    let df = ctx
-        .sql(
-            "SELECT \
-               car, \
-               speed, \
-               simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time) 
AS smooth_speed,\
-               time \
-               from cars \
-             ORDER BY \
-               car",
-        )
-        .await?;
-    // print the results
-    df.show().await?;
-
-    Ok(())
-}
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index 4bfc3f07bb..39e1e8f261 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -344,7 +344,7 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
     /// optimizations manually for specific UDFs.
     ///
     /// Example:
-    /// [`simplify_udwf_expression.rs`]: 
<https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simplify_udwf_expression.rs>
+    /// [`advanced_udwf.rs`]: 
<https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udwf.rs>
     ///
     /// # Returns
     /// [None] if simplify is not defined or,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to