This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new d34d9d5c Use DataFusion 12.0.0-rc1 (and add support for 
DateTimeIntervalExpr and more binary operators) (#200)
d34d9d5c is described below

commit d34d9d5cd0045c10644b4203b789d93dc9e3085c
Author: Andy Grove <[email protected]>
AuthorDate: Mon Sep 12 15:36:01 2022 -0600

    Use DataFusion 12.0.0-rc1 (and add support for DateTimeIntervalExpr and 
more binary operators) (#200)
    
    * Add serde support for DateTimeInterval
    
    * Use try_into for schema and types
    
    * bump versions again
    
    * use 12.0.0-rc1
    
    * Update Python module
---
 ballista-cli/Cargo.toml                            |  4 +--
 ballista/rust/client/Cargo.toml                    |  4 +--
 ballista/rust/core/Cargo.toml                      |  6 ++--
 ballista/rust/core/proto/ballista.proto            |  8 +++++
 .../core/src/serde/physical_plan/from_proto.rs     |  7 ++++
 ballista/rust/core/src/serde/physical_plan/mod.rs  | 40 ++++++++++++++++++----
 .../rust/core/src/serde/physical_plan/to_proto.rs  | 25 +++++++++++---
 ballista/rust/executor/Cargo.toml                  |  4 +--
 ballista/rust/scheduler/Cargo.toml                 |  6 ++--
 .../rust/scheduler/src/scheduler_server/grpc.rs    |  8 +++--
 benchmarks/Cargo.toml                              |  4 +--
 examples/Cargo.toml                                |  2 +-
 python/Cargo.toml                                  |  2 +-
 python/src/functions.rs                            |  1 +
 python/src/udaf.rs                                 |  2 +-
 15 files changed, 93 insertions(+), 30 deletions(-)

diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 7e363a5f..1b871db1 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 7f870735..2bd34c7d 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,8 +31,8 @@ rust-version = "1.59"
 ballista-core = { path = "../core", version = "0.7.0" }
 ballista-executor = { path = "../executor", version = "0.7.0", optional = true 
}
 ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = 
true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 026f0560..d6c35cda 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -39,14 +39,14 @@ arrow-flight = { version = "22.0.0" }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
 futures = "0.3"
 hashbrown = "0.12"
 
 libloading = "0.7.3"
 log = "0.4"
-object_store = "0.4.0"
+object_store = "0.5.0"
 once_cell = "1.9.0"
 
 parking_lot = "0.12"
diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index cb42d0ae..7b7f748c 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -118,6 +118,8 @@ message PhysicalExprNode {
     PhysicalWindowExprNode window_expr = 15;
 
     PhysicalScalarUdfNode scalar_udf = 16;
+
+    PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;
   }
 }
 
@@ -164,6 +166,12 @@ message PhysicalBinaryExprNode {
   string op = 3;
 }
 
+message PhysicalDateTimeIntervalExprNode {
+  PhysicalExprNode l = 1;
+  PhysicalExprNode r = 2;
+  string op = 3;
+}
+
 message PhysicalSortExprNode {
   PhysicalExprNode expr = 1;
   bool asc = 2;
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 6abb5f12..4b4bea5c 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -28,6 +28,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::context::ExecutionProps;
 use datafusion::logical_expr::window_function::WindowFunction;
 use datafusion::logical_plan::FunctionRegistry;
+use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
 use datafusion::physical_expr::ScalarFunctionExpr;
 use datafusion::physical_plan::file_format::FileScanConfig;
 use datafusion::physical_plan::{
@@ -84,6 +85,12 @@ pub(crate) fn parse_physical_expr(
                 input_schema,
             )?,
         )),
+        ExprType::DateTimeIntervalExpr(expr) => 
Arc::new(DateTimeIntervalExpr::try_new(
+            parse_required_physical_box_expr(&expr.l, registry, "left", 
input_schema)?,
+            from_proto_binary_op(&expr.op)?,
+            parse_required_physical_box_expr(&expr.r, registry, "right", 
input_schema)?,
+            input_schema,
+        )?),
         ExprType::AggregateExpr(_) => {
             return Err(BallistaError::General(
                 "Cannot convert aggregate expr node to physical 
expression".to_owned(),
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs 
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 167a9f35..b260b5d9 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -685,7 +685,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::Explain(
                     protobuf::ExplainExecNode {
-                        schema: Some(exec.schema().as_ref().into()),
+                        schema: Some(exec.schema().as_ref().try_into()?),
                         stringified_plans: exec
                             .stringified_plans()
                             .iter()
@@ -797,7 +797,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                             }
                         })
                         .collect();
-                    let schema = f.schema().into();
+                    let schema = f.schema().try_into()?;
                     Ok(protobuf::JoinFilter {
                         expression: Some(expression),
                         column_indices,
@@ -910,14 +910,14 @@ impl AsExecutionPlan for PhysicalPlanNode {
                         aggr_expr_name: agg_names,
                         mode: agg_mode as i32,
                         input: Some(Box::new(input)),
-                        input_schema: Some(input_schema.as_ref().into()),
+                        input_schema: Some(input_schema.as_ref().try_into()?),
                         null_expr,
                         groups,
                     },
                 ))),
             })
         } else if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
-            let schema = empty.schema().as_ref().into();
+            let schema = empty.schema().as_ref().try_into()?;
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::Empty(
                     protobuf::EmptyExecNode {
@@ -985,7 +985,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::ShuffleReader(
                     protobuf::ShuffleReaderExecNode {
                         partition,
-                        schema: Some(exec.schema().as_ref().into()),
+                        schema: Some(exec.schema().as_ref().try_into()?),
                     },
                 )),
             })
@@ -1102,7 +1102,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::Unresolved(
                     protobuf::UnresolvedShuffleExecNode {
                         stage_id: exec.stage_id as u32,
-                        schema: Some(exec.schema().as_ref().into()),
+                        schema: Some(exec.schema().as_ref().try_into()?),
                         input_partition_count: exec.input_partition_count as 
u32,
                         output_partition_count: exec.output_partition_count as 
u32,
                     },
@@ -1197,10 +1197,12 @@ mod roundtrip_tests {
     use std::sync::Arc;
 
     use datafusion::arrow::array::ArrayRef;
+    use datafusion::arrow::datatypes::IntervalUnit;
     use datafusion::datasource::object_store::ObjectStoreUrl;
     use datafusion::execution::context::ExecutionProps;
     use datafusion::logical_expr::{BuiltinScalarFunction, Volatility};
     use datafusion::logical_plan::create_udf;
+    use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
     use datafusion::physical_expr::ScalarFunctionExpr;
     use datafusion::physical_plan::aggregates::PhysicalGroupBy;
     use datafusion::physical_plan::functions;
@@ -1294,6 +1296,32 @@ mod roundtrip_tests {
         roundtrip_test(Arc::new(EmptyExec::new(false, 
Arc::new(Schema::empty()))))
     }
 
+    #[test]
+    fn roundtrip_date_time_interval() -> Result<()> {
+        let schema = Schema::new(vec![
+            Field::new("some_date", DataType::Date32, false),
+            Field::new(
+                "some_interval",
+                DataType::Interval(IntervalUnit::DayTime),
+                false,
+            ),
+        ]);
+        let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));
+        let date_expr = col("some_date", &schema)?;
+        let literal_expr = col("some_interval", &schema)?;
+        let date_time_interval_expr = Arc::new(DateTimeIntervalExpr::try_new(
+            date_expr,
+            Operator::Plus,
+            literal_expr,
+            &schema,
+        )?);
+        let plan = Arc::new(ProjectionExec::try_new(
+            vec![(date_time_interval_expr, "result".to_string())],
+            input,
+        )?);
+        roundtrip_test(plan)
+    }
+
     #[test]
     fn roundtrip_local_limit() -> Result<()> {
         roundtrip_test(Arc::new(LocalLimitExec::new(
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 16195377..a6a7a694 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -46,6 +46,7 @@ use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
 use crate::serde::{protobuf, BallistaError};
 
 use datafusion::logical_expr::BuiltinScalarFunction;
+use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
 use datafusion::physical_expr::ScalarFunctionExpr;
 
 impl TryInto<protobuf::PhysicalExprNode> for Arc<dyn AggregateExpr> {
@@ -279,7 +280,7 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for 
protobuf::PhysicalExprNode {
                 expr_type: 
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
                     protobuf::PhysicalCastNode {
                         expr: Some(Box::new(cast.expr().clone().try_into()?)),
-                        arrow_type: Some(cast.cast_type().into()),
+                        arrow_type: Some(cast.cast_type().try_into()?),
                     },
                 ))),
             })
@@ -288,7 +289,7 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for 
protobuf::PhysicalExprNode {
                 expr_type: 
Some(protobuf::physical_expr_node::ExprType::TryCast(
                     Box::new(protobuf::PhysicalTryCastNode {
                         expr: Some(Box::new(cast.expr().clone().try_into()?)),
-                        arrow_type: Some(cast.cast_type().into()),
+                        arrow_type: Some(cast.cast_type().try_into()?),
                     }),
                 )),
             })
@@ -309,7 +310,7 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for 
protobuf::PhysicalExprNode {
                                 name: expr.name().to_string(),
                                 fun: fun.into(),
                                 args,
-                                return_type: Some(expr.return_type().into()),
+                                return_type: 
Some(expr.return_type().try_into()?),
                             },
                         ),
                     ),
@@ -320,11 +321,25 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for 
protobuf::PhysicalExprNode {
                         protobuf::PhysicalScalarUdfNode {
                             name: expr.name().to_string(),
                             args,
-                            return_type: Some(expr.return_type().into()),
+                            return_type: Some(expr.return_type().try_into()?),
                         },
                     )),
                 })
             }
+        } else if let Some(expr) = expr.downcast_ref::<DateTimeIntervalExpr>() 
{
+            let dti_expr = Box::new(protobuf::PhysicalDateTimeIntervalExprNode 
{
+                l: Some(Box::new(expr.lhs().to_owned().try_into()?)),
+                r: Some(Box::new(expr.rhs().to_owned().try_into()?)),
+                op: format!("{:?}", expr.op()),
+            });
+
+            Ok(protobuf::PhysicalExprNode {
+                expr_type: Some(
+                    
protobuf::physical_expr_node::ExprType::DateTimeIntervalExpr(
+                        dti_expr,
+                    ),
+                ),
+            })
         } else {
             Err(BallistaError::General(format!(
                 "physical_plan::to_proto() unsupported expression {:?}",
@@ -435,7 +450,7 @@ impl TryFrom<&FileScanConfig> for 
protobuf::FileScanExecConf {
                 .iter()
                 .map(|n| *n as u32)
                 .collect(),
-            schema: Some(conf.file_schema.as_ref().into()),
+            schema: Some(conf.file_schema.as_ref().try_into()?),
             table_partition_cols: conf.table_partition_cols.to_vec(),
             object_store_url: conf.object_store_url.to_string(),
         })
diff --git a/ballista/rust/executor/Cargo.toml 
b/ballista/rust/executor/Cargo.toml
index 7f0ffdbd..232a365c 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -40,8 +40,8 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
 futures = "0.3"
 hyper = "0.14.4"
 log = "0.4"
diff --git a/ballista/rust/scheduler/Cargo.toml 
b/ballista/rust/scheduler/Cargo.toml
index a6f5e062..24616ad8 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -45,8 +45,8 @@ ballista-core = { path = "../core", version = "0.7.0" }
 base64 = { version = "0.13", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
 etcd-client = { version = "0.9", optional = true }
 flatbuffers = { version = "2.1.2" }
 futures = "0.3"
@@ -55,7 +55,7 @@ http-body = "0.4"
 hyper = "0.14.4"
 itertools = "0.10.3"
 log = "0.4"
-object_store = "0.4.0"
+object_store = "0.5.0"
 parking_lot = "0.12"
 parse_arg = "0.1.3"
 prost = "0.11"
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs 
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 66ab3c93..f57262eb 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -40,9 +40,9 @@ use futures::TryStreamExt;
 use log::{debug, error, info, warn};
 
 // use http_body::Body;
+use std::convert::TryInto;
 use std::ops::Deref;
 use std::sync::Arc;
-
 use std::time::{SystemTime, UNIX_EPOCH};
 use tonic::{Request, Response, Status};
 
@@ -336,7 +336,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
             })?;
 
         Ok(Response::new(GetFileMetadataResult {
-            schema: Some(schema.as_ref().into()),
+            schema: Some(schema.as_ref().try_into().map_err(|e| {
+                let msg = format!("Error inferring schema: {}", e);
+                error!("{}", msg);
+                tonic::Status::internal(msg)
+            })?),
         }))
     }
 
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 003e1717..cc0d41ae 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 440c33d2..14155ee6 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1" }
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index d2316d93..81bd6d1e 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
 [dependencies]
 async-trait = "0.1"
 ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"8b59b207aaadd6f2c19c28d1f1431a0cb8d110ae", features = ["pyarrow"] }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"12.0.0-rc1", features = ["pyarrow"] }
 futures = "0.3"
 mimalloc = { version = "*", optional = true, default-features = false }
 pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", 
"abi3-py37"] }
diff --git a/python/src/functions.rs b/python/src/functions.rs
index 2bb28016..2d0550d2 100644
--- a/python/src/functions.rs
+++ b/python/src/functions.rs
@@ -169,6 +169,7 @@ macro_rules! aggregate_function {
                 fun: AggregateFunction::$FUNC,
                 args: args.into_iter().map(|e| e.into()).collect(),
                 distinct,
+                filter: None,
             };
             expr.into()
         }
diff --git a/python/src/udaf.rs b/python/src/udaf.rs
index 3432b8f3..3b93048b 100644
--- a/python/src/udaf.rs
+++ b/python/src/udaf.rs
@@ -97,7 +97,7 @@ impl Accumulator for RustAccumulator {
 }
 
 pub fn to_rust_accumulator(accum: PyObject) -> 
AccumulatorFunctionImplementation {
-    Arc::new(move || -> Result<Box<dyn Accumulator>> {
+    Arc::new(move |_| -> Result<Box<dyn Accumulator>> {
         let accum = Python::with_gil(|py| {
             accum
                 .call0(py)

Reply via email to