This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f80a96f Bump DataFusion version (#453)
1f80a96f is described below

commit 1f80a96f077f1c97fae386c89d478d5f67eaa82a
Author: Andy Grove <[email protected]>
AuthorDate: Thu Oct 27 22:43:01 2022 -0600

    Bump DataFusion version (#453)
    
    * bump DataFusion version
    
    * fix
    
    * Fix python
    
    * Formatting
    
    * Python compile
    
    * Update rev
    
    * Compile
    
    * Compile
    
    * Clippy
    
    * Nullability
    
    * fomat
    
    Co-authored-by: Heres, Daniel <[email protected]>
---
 ballista-cli/Cargo.toml                            |   4 +-
 ballista/client/Cargo.toml                         |   6 +-
 ballista/client/src/context.rs                     |   6 +-
 ballista/core/Cargo.toml                           |   6 +-
 ballista/core/proto/datafusion.proto               | 280 +++++++++++----------
 .../core/src/execution_plans/distributed_query.rs  |   2 +-
 ballista/core/src/serde/mod.rs                     |  35 ++-
 .../core/src/serde/physical_plan/from_proto.rs     |   5 +-
 ballista/core/src/serde/physical_plan/mod.rs       |  26 +-
 ballista/core/src/utils.rs                         |   4 +-
 ballista/executor/Cargo.toml                       |   8 +-
 ballista/scheduler/Cargo.toml                      |   6 +-
 ballista/scheduler/src/display.rs                  |   4 +-
 ballista/scheduler/src/planner.rs                  |   2 +-
 ballista/scheduler/src/scheduler_server/event.rs   |   2 +-
 ballista/scheduler/src/scheduler_server/mod.rs     |   4 +-
 ballista/scheduler/src/state/execution_graph.rs    |   2 +-
 .../scheduler/src/state/execution_graph_dot.rs     |   4 +-
 ballista/scheduler/src/state/mod.rs                |   2 +-
 benchmarks/Cargo.toml                              |   4 +-
 benchmarks/src/bin/tpch.rs                         |   9 +-
 examples/Cargo.toml                                |   2 +-
 python/Cargo.toml                                  |   2 +-
 python/ballista/tests/test_catalog.py              |   6 +-
 python/src/dataframe.rs                            |   2 +-
 python/src/dataset_exec.rs                         |   9 +-
 python/src/expression.rs                           |  11 +-
 python/src/functions.rs                            |   3 +-
 python/src/pyarrow_filter_expression.rs            |   6 +-
 29 files changed, 253 insertions(+), 209 deletions(-)

diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index e36d03c5..f0fbaad6 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.9.0", 
features = [
     "standalone",
 ] }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "13.0.0"
-datafusion-cli = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index 8c030b99..33eb9a53 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -31,12 +31,12 @@ rust-version = "1.59"
 ballista-core = { path = "../core", version = "0.9.0" }
 ballista-executor = { path = "../executor", version = "0.9.0", optional = true 
}
 ballista-scheduler = { path = "../scheduler", version = "0.9.0", optional = 
true }
-datafusion = "13.0.0"
-datafusion-proto = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
-sqlparser = "0.25"
+sqlparser = "0.26"
 tempfile = "3"
 tokio = "1.0"
 
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 797a527d..22c849c6 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -33,11 +33,9 @@ use datafusion_proto::protobuf::LogicalPlanNode;
 
 use datafusion::catalog::TableReference;
 use datafusion::dataframe::DataFrame;
-use datafusion::datasource::TableProvider;
+use datafusion::datasource::{source_as_provider, TableProvider};
 use datafusion::error::{DataFusionError, Result};
-use datafusion::logical_plan::{
-    source_as_provider, CreateExternalTable, LogicalPlan, TableScan,
-};
+use datafusion::logical_expr::{CreateExternalTable, LogicalPlan, TableScan};
 use datafusion::prelude::{
     AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, 
SessionContext,
 };
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 955af54e..2d710da9 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -42,13 +42,13 @@ simd = ["datafusion/simd"]
 [dependencies]
 ahash = { version = "0.8", default-features = false }
 
-arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "25.0.0", features = ["flight-sql-experimental"] }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
 datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, 
optional = true }
-datafusion-proto = "13.0.0"
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
 futures = "0.3"
 hashbrown = "0.12"
 
diff --git a/ballista/core/proto/datafusion.proto 
b/ballista/core/proto/datafusion.proto
index 0a92d0d1..b7ae6360 100644
--- a/ballista/core/proto/datafusion.proto
+++ b/ballista/core/proto/datafusion.proto
@@ -69,6 +69,8 @@ message LogicalPlanNode {
     SubqueryAliasNode subquery_alias = 21;
     CreateViewNode create_view = 22;
     DistinctNode distinct = 23;
+    ViewTableScanNode view_scan = 24;
+    CustomTableScanNode custom_scan = 25;
   }
 }
 
@@ -109,6 +111,23 @@ message ListingTableScanNode {
   }
 }
 
+message ViewTableScanNode {
+  string table_name = 1;
+  LogicalPlanNode input = 2;
+  datafusion.Schema schema = 3;
+  ProjectionColumns projection = 4;
+  string definition = 5;
+}
+
+// Logical Plan to Scan a CustomTableProvider registered at runtime
+message CustomTableScanNode {
+  string table_name = 1;
+  ProjectionColumns projection = 2;
+  datafusion.Schema schema = 3;
+  repeated datafusion.LogicalExprNode filters = 4;
+  bytes custom_table_data = 5;
+}
+
 message ProjectionNode {
   LogicalPlanNode input = 1;
   repeated datafusion.LogicalExprNode expr = 2;
@@ -156,6 +175,7 @@ message CreateExternalTableNode {
   bool if_not_exists = 7;
   string delimiter = 8;
   string definition = 9;
+  string file_compression_type = 10;
 }
 
 message CreateCatalogSchemaNode {
@@ -653,53 +673,53 @@ message Field {
 }
 
 message FixedSizeBinary{
-    int32 length = 1;
+  int32 length = 1;
 }
 
 message Timestamp{
-    TimeUnit time_unit = 1;
-    string timezone = 2;
+  TimeUnit time_unit = 1;
+  string timezone = 2;
 }
 
 enum DateUnit{
-    Day = 0;
-    DateMillisecond = 1;
+  Day = 0;
+  DateMillisecond = 1;
 }
 
 enum TimeUnit{
-    Second = 0;
-    Millisecond = 1;
-    Microsecond = 2;
-    Nanosecond = 3;
+  Second = 0;
+  Millisecond = 1;
+  Microsecond = 2;
+  Nanosecond = 3;
 }
 
 enum IntervalUnit{
-    YearMonth = 0;
-    DayTime = 1;
-    MonthDayNano = 2;
+  YearMonth = 0;
+  DayTime = 1;
+  MonthDayNano = 2;
 }
 
 message Decimal{
-    uint64 whole = 1;
-    uint64 fractional = 2;
+  uint64 whole = 1;
+  uint64 fractional = 2;
 }
 
 message List{
-    Field field_type = 1;
+  Field field_type = 1;
 }
 
 message FixedSizeList{
-    Field field_type = 1;
-    int32 list_size = 2;
+  Field field_type = 1;
+  int32 list_size = 2;
 }
 
 message Dictionary{
-    ArrowType key = 1;
-    ArrowType value = 2;
+  ArrowType key = 1;
+  ArrowType value = 2;
 }
 
 message Struct{
-    repeated Field sub_field_types = 1;
+  repeated Field sub_field_types = 1;
 }
 
 enum UnionMode{
@@ -708,14 +728,17 @@ enum UnionMode{
 }
 
 message Union{
-    repeated Field union_types = 1;
-    UnionMode union_mode = 2;
-    repeated int32 type_ids = 3;
+  repeated Field union_types = 1;
+  UnionMode union_mode = 2;
+  repeated int32 type_ids = 3;
 }
 
 message ScalarListValue{
-    Field field = 1;
-    repeated ScalarValue values = 2;
+  // encode null explicitly to distinguish a list with a null value
+  // from a list with no values)
+  bool is_null = 3;
+  Field field = 1;
+  repeated ScalarValue values = 2;
 }
 
 message ScalarTimestampValue {
@@ -748,40 +771,40 @@ message StructValue {
 }
 
 message ScalarValue{
-    oneof value {
-        // Null value of any type (type is encoded)
-        PrimitiveScalarType null_value = 19;
-
-        bool   bool_value = 1;
-        string utf8_value = 2;
-        string large_utf8_value = 3;
-        int32  int8_value = 4;
-        int32  int16_value = 5;
-        int32  int32_value = 6;
-        int64  int64_value = 7;
-        uint32 uint8_value = 8;
-        uint32 uint16_value = 9;
-        uint32 uint32_value = 10;
-        uint64 uint64_value = 11;
-        float  float32_value = 12;
-        double float64_value = 13;
-        //Literal Date32 value always has a unit of day
-        int32  date_32_value = 14;
-        ScalarListValue list_value = 17;
-        ScalarType null_list_value = 18;
-
-        Decimal128 decimal128_value = 20;
-        int64 date_64_value = 21;
-        int32 interval_yearmonth_value = 24;
-        int64 interval_daytime_value = 25;
-        ScalarTimestampValue timestamp_value = 26;
-        ScalarDictionaryValue dictionary_value = 27;
-        bytes binary_value = 28;
-        bytes large_binary_value = 29;
-        int64 time64_value = 30;
-        IntervalMonthDayNanoValue interval_month_day_nano = 31;
-        StructValue struct_value = 32;
-    }
+  oneof value {
+    // Null value of any type (type is encoded)
+    PrimitiveScalarType null_value = 19;
+
+    bool   bool_value = 1;
+    string utf8_value = 2;
+    string large_utf8_value = 3;
+    int32  int8_value = 4;
+    int32  int16_value = 5;
+    int32  int32_value = 6;
+    int64  int64_value = 7;
+    uint32 uint8_value = 8;
+    uint32 uint16_value = 9;
+    uint32 uint32_value = 10;
+    uint64 uint64_value = 11;
+    float  float32_value = 12;
+    double float64_value = 13;
+    //Literal Date32 value always has a unit of day
+    int32  date_32_value = 14;
+    ScalarListValue list_value = 17;
+    //WAS: ScalarType null_list_value = 18;
+
+    Decimal128 decimal128_value = 20;
+    int64 date_64_value = 21;
+    int32 interval_yearmonth_value = 24;
+    int64 interval_daytime_value = 25;
+    ScalarTimestampValue timestamp_value = 26;
+    ScalarDictionaryValue dictionary_value = 27;
+    bytes binary_value = 28;
+    bytes large_binary_value = 29;
+    int64 time64_value = 30;
+    IntervalMonthDayNanoValue interval_month_day_nano = 31;
+    StructValue struct_value = 32;
+  }
 }
 
 message Decimal128{
@@ -794,88 +817,77 @@ message Decimal128{
 // List
 enum PrimitiveScalarType{
 
-    BOOL = 0;     // arrow::Type::BOOL
-    UINT8 = 1;    // arrow::Type::UINT8
-    INT8 = 2;     // arrow::Type::INT8
-    UINT16 = 3;   // represents arrow::Type fields in src/arrow/type.h
-    INT16 = 4;
-    UINT32 = 5;
-    INT32 = 6;
-    UINT64 = 7;
-    INT64 = 8;
-    FLOAT32 = 9;
-    FLOAT64 = 10;
-    UTF8 = 11;
-    LARGE_UTF8 = 12;
-    DATE32 = 13;
-    TIMESTAMP_MICROSECOND = 14;
-    TIMESTAMP_NANOSECOND = 15;
-    NULL = 16;
-    DECIMAL128 = 17;
-    DATE64 = 20;
-    TIMESTAMP_SECOND = 21;
-    TIMESTAMP_MILLISECOND = 22;
-    INTERVAL_YEARMONTH = 23;
-    INTERVAL_DAYTIME = 24;
-    INTERVAL_MONTHDAYNANO = 28;
-
-    BINARY = 25;
-    LARGE_BINARY = 26;
-
-    TIME64 = 27;
-}
-
-message ScalarType{
-    oneof datatype{
-        PrimitiveScalarType scalar = 1;
-        ScalarListType list = 2;
-    }
-}
-
-message ScalarListType{
-    repeated string field_names = 3;
-    PrimitiveScalarType deepest_type = 2;
+  BOOL = 0;     // arrow::Type::BOOL
+  UINT8 = 1;    // arrow::Type::UINT8
+  INT8 = 2;     // arrow::Type::INT8
+  UINT16 = 3;   // represents arrow::Type fields in src/arrow/type.h
+  INT16 = 4;
+  UINT32 = 5;
+  INT32 = 6;
+  UINT64 = 7;
+  INT64 = 8;
+  FLOAT32 = 9;
+  FLOAT64 = 10;
+  UTF8 = 11;
+  LARGE_UTF8 = 12;
+  DATE32 = 13;
+  TIMESTAMP_MICROSECOND = 14;
+  TIMESTAMP_NANOSECOND = 15;
+  NULL = 16;
+  DECIMAL128 = 17;
+  DATE64 = 20;
+  TIMESTAMP_SECOND = 21;
+  TIMESTAMP_MILLISECOND = 22;
+  INTERVAL_YEARMONTH = 23;
+  INTERVAL_DAYTIME = 24;
+  INTERVAL_MONTHDAYNANO = 28;
+
+  BINARY = 25;
+  LARGE_BINARY = 26;
+
+  TIME64 = 27;
 }
 
+
 // Broke out into multiple message types so that type
 // metadata did not need to be in separate message
 // All types that are of the empty message types contain no additional metadata
 // about the type
 message ArrowType{
-    oneof arrow_type_enum{
-        EmptyMessage NONE = 1;     // arrow::Type::NA
-        EmptyMessage BOOL =  2;     // arrow::Type::BOOL
-        EmptyMessage UINT8 = 3;    // arrow::Type::UINT8
-        EmptyMessage INT8 =  4;     // arrow::Type::INT8
-        EmptyMessage UINT16 =5;   // represents arrow::Type fields in 
src/arrow/type.h
-        EmptyMessage INT16 = 6;
-        EmptyMessage UINT32 =7;
-        EmptyMessage INT32 = 8;
-        EmptyMessage UINT64 =9;
-        EmptyMessage INT64 =10 ;
-        EmptyMessage FLOAT16 =11 ;
-        EmptyMessage FLOAT32 =12 ;
-        EmptyMessage FLOAT64 =13 ;
-        EmptyMessage UTF8 =14 ;
-        EmptyMessage LARGE_UTF8 = 32;
-        EmptyMessage BINARY =15 ;
-        int32 FIXED_SIZE_BINARY =16 ;
-        EmptyMessage LARGE_BINARY = 31;
-        EmptyMessage DATE32 =17 ;
-        EmptyMessage DATE64 =18 ;
-        TimeUnit DURATION = 19;
-        Timestamp TIMESTAMP =20 ;
-        TimeUnit TIME32 =21 ;
-        TimeUnit TIME64 =22 ;
-        IntervalUnit INTERVAL =23 ;
-        Decimal DECIMAL =24 ;
-        List LIST =25;
-        List LARGE_LIST = 26;
-        FixedSizeList FIXED_SIZE_LIST = 27;
-        Struct STRUCT =28;
-        Union UNION =29;
-        Dictionary DICTIONARY =30;
-    }
+  oneof arrow_type_enum{
+    EmptyMessage NONE = 1;     // arrow::Type::NA
+    EmptyMessage BOOL =  2;     // arrow::Type::BOOL
+    EmptyMessage UINT8 = 3;    // arrow::Type::UINT8
+    EmptyMessage INT8 =  4;     // arrow::Type::INT8
+    EmptyMessage UINT16 =5;   // represents arrow::Type fields in 
src/arrow/type.h
+    EmptyMessage INT16 = 6;
+    EmptyMessage UINT32 =7;
+    EmptyMessage INT32 = 8;
+    EmptyMessage UINT64 =9;
+    EmptyMessage INT64 =10 ;
+    EmptyMessage FLOAT16 =11 ;
+    EmptyMessage FLOAT32 =12 ;
+    EmptyMessage FLOAT64 =13 ;
+    EmptyMessage UTF8 =14 ;
+    EmptyMessage LARGE_UTF8 = 32;
+    EmptyMessage BINARY =15 ;
+    int32 FIXED_SIZE_BINARY =16 ;
+    EmptyMessage LARGE_BINARY = 31;
+    EmptyMessage DATE32 =17 ;
+    EmptyMessage DATE64 =18 ;
+    TimeUnit DURATION = 19;
+    Timestamp TIMESTAMP =20 ;
+    TimeUnit TIME32 =21 ;
+    TimeUnit TIME64 =22 ;
+    IntervalUnit INTERVAL =23 ;
+    Decimal DECIMAL =24 ;
+    List LIST =25;
+    List LARGE_LIST = 26;
+    FixedSizeList FIXED_SIZE_LIST = 27;
+    Struct STRUCT =28;
+    Union UNION =29;
+    Dictionary DICTIONARY =30;
+  }
 }
 
 //Useful for representing an empty enum variant in rust
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index bf5dc0cf..cd56cd36 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -29,7 +29,7 @@ use datafusion::arrow::error::{ArrowError, Result as 
ArrowResult};
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::TaskContext;
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::{
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 3576d727..04043822 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -21,8 +21,8 @@
 use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
 use arrow_flight::sql::ProstMessageExt;
 use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_plan::FunctionRegistry;
-use datafusion::physical_plan::join_utils::JoinSide;
+use datafusion::execution::FunctionRegistry;
+use datafusion::physical_plan::joins::utils::JoinSide;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::{
     AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
@@ -245,12 +245,15 @@ fn str_to_byte(s: &str) -> Result<u8, BallistaError> {
 mod tests {
     use async_trait::async_trait;
     use datafusion::arrow::datatypes::SchemaRef;
+    use datafusion::common::DFSchemaRef;
     use datafusion::error::DataFusionError;
-    use datafusion::execution::context::{QueryPlanner, SessionState, 
TaskContext};
-    use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-    use datafusion::logical_plan::plan::Extension;
-    use datafusion::logical_plan::{
-        col, DFSchemaRef, Expr, FunctionRegistry, LogicalPlan, 
UserDefinedLogicalNode,
+    use datafusion::execution::{
+        context::{QueryPlanner, SessionState, TaskContext},
+        runtime_env::{RuntimeConfig, RuntimeEnv},
+        FunctionRegistry,
+    };
+    use datafusion::logical_expr::{
+        col, Expr, Extension, LogicalPlan, UserDefinedLogicalNode,
     };
     use datafusion::physical_plan::expressions::PhysicalSortExpr;
     use datafusion::physical_plan::planner::{DefaultPhysicalPlanner, 
ExtensionPlanner};
@@ -548,6 +551,24 @@ mod tests {
                 Err(DataFusionError::Plan("unsupported plan type".to_string()))
             }
         }
+
+        fn try_decode_table_provider(
+            &self,
+            _buf: &[u8],
+            _schema: SchemaRef,
+            _ctx: &SessionContext,
+        ) -> Result<Arc<dyn datafusion::datasource::TableProvider>, 
DataFusionError>
+        {
+            unimplemented!()
+        }
+
+        fn try_encode_table_provider(
+            &self,
+            _node: Arc<dyn datafusion::datasource::TableProvider>,
+            _buf: &mut Vec<u8>,
+        ) -> Result<(), DataFusionError> {
+            unimplemented!()
+        }
     }
 
     impl PhysicalExtensionCodec for TopKExtensionCodec {
diff --git a/ballista/core/src/serde/physical_plan/from_proto.rs 
b/ballista/core/src/serde/physical_plan/from_proto.rs
index 1e451adb..693037bd 100644
--- a/ballista/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/core/src/serde/physical_plan/from_proto.rs
@@ -23,11 +23,12 @@ use std::sync::Arc;
 
 use chrono::{TimeZone, Utc};
 use datafusion::arrow::datatypes::Schema;
+use datafusion::config::ConfigOptions;
 use datafusion::datasource::listing::{FileRange, PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::context::ExecutionProps;
+use datafusion::execution::FunctionRegistry;
 use datafusion::logical_expr::window_function::WindowFunction;
-use datafusion::logical_plan::FunctionRegistry;
 use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
 use datafusion::physical_expr::ScalarFunctionExpr;
 use datafusion::physical_plan::file_format::FileScanConfig;
@@ -42,6 +43,7 @@ use datafusion::physical_plan::{ColumnStatistics, 
PhysicalExpr, Statistics};
 use datafusion_proto::from_proto::from_proto_binary_op;
 use object_store::path::Path;
 use object_store::ObjectMeta;
+use parking_lot::RwLock;
 
 use crate::serde::protobuf::physical_expr_node::ExprType;
 
@@ -397,6 +399,7 @@ impl TryInto<FileScanConfig> for 
&protobuf::FileScanExecConf {
         let statistics = convert_required!(self.statistics)?;
 
         Ok(FileScanConfig {
+            config_options: Arc::new(RwLock::new(ConfigOptions::new())), // 
TODO add serde
             object_store_url: ObjectStoreUrl::parse(&self.object_store_url)?,
             file_schema: schema,
             file_groups: self
diff --git a/ballista/core/src/serde/physical_plan/mod.rs 
b/ballista/core/src/serde/physical_plan/mod.rs
index d081422a..610fe5fa 100644
--- a/ballista/core/src/serde/physical_plan/mod.rs
+++ b/ballista/core/src/serde/physical_plan/mod.rs
@@ -18,21 +18,23 @@
 use std::convert::TryInto;
 use std::sync::Arc;
 
+use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
 use prost::bytes::BufMut;
 use prost::Message;
 
 use datafusion::arrow::compute::SortOptions;
 use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::config::ConfigOptions;
+use datafusion::datasource::file_format::file_type::FileCompressionType;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_plan::window_frames::WindowFrame;
-use datafusion::logical_plan::FunctionRegistry;
+use datafusion::execution::FunctionRegistry;
+use datafusion::logical_expr::WindowFrame;
 use datafusion::physical_plan::aggregates::{create_aggregate_expr, 
AggregateMode};
 use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::cross_join::CrossJoinExec;
 use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::explain::ExplainExec;
 use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr};
@@ -40,8 +42,8 @@ use datafusion::physical_plan::file_format::{
     AvroExec, CsvExec, FileScanConfig, ParquetExec,
 };
 use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
-use datafusion::physical_plan::join_utils::{ColumnIndex, JoinFilter};
+use datafusion::physical_plan::joins::CrossJoinExec;
+use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
 use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::repartition::RepartitionExec;
@@ -53,6 +55,7 @@ use datafusion::physical_plan::{
     AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
 };
 use datafusion_proto::from_proto::parse_expr;
+use parking_lot::RwLock;
 
 use crate::error::BallistaError;
 use crate::execution_plans::{
@@ -159,6 +162,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 decode_scan_config(scan.base_conf.as_ref().unwrap())?,
                 scan.has_header,
                 str_to_byte(&scan.delimiter)?,
+                FileCompressionType::UNCOMPRESSED,
             ))),
             PhysicalPlanType::ParquetScan(scan) => {
                 let predicate = scan
@@ -311,7 +315,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                                     &[window_node_expr],
                                     &[],
                                     &[],
-                                    Some(WindowFrame::default()),
+                                    Some(Arc::new(WindowFrame::default())),
                                     &physical_schema,
                                 )?)
                             }
@@ -1259,6 +1263,7 @@ fn decode_scan_config(
     };
 
     Ok(FileScanConfig {
+        config_options: Arc::new(RwLock::new(ConfigOptions::new())), // TODO 
add serde
         object_store_url,
         file_schema: schema,
         file_groups,
@@ -1289,10 +1294,11 @@ mod roundtrip_tests {
 
     use datafusion::arrow::array::ArrayRef;
     use datafusion::arrow::datatypes::IntervalUnit;
+    use datafusion::config::ConfigOptions;
     use datafusion::datasource::object_store::ObjectStoreUrl;
     use datafusion::execution::context::ExecutionProps;
+    use datafusion::logical_expr::create_udf;
     use datafusion::logical_expr::{BuiltinScalarFunction, Volatility};
-    use datafusion::logical_plan::create_udf;
     use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
     use datafusion::physical_expr::ScalarFunctionExpr;
     use datafusion::physical_plan::aggregates::PhysicalGroupBy;
@@ -1305,7 +1311,7 @@ mod roundtrip_tests {
             datatypes::{DataType, Field, Schema},
         },
         datasource::listing::PartitionedFile,
-        logical_plan::{JoinType, Operator},
+        logical_expr::{JoinType, Operator},
         physical_plan::{
             aggregates::{AggregateExec, AggregateMode},
             empty::EmptyExec,
@@ -1313,7 +1319,7 @@ mod roundtrip_tests {
             expressions::{Avg, Column, DistinctCount, PhysicalSortExpr},
             file_format::{FileScanConfig, ParquetExec},
             filter::FilterExec,
-            hash_join::{HashJoinExec, PartitionMode},
+            joins::{HashJoinExec, PartitionMode},
             limit::{GlobalLimitExec, LocalLimitExec},
             sorts::sort::SortExec,
             AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, 
Statistics,
@@ -1326,6 +1332,7 @@ mod roundtrip_tests {
     use crate::serde::protobuf::PhysicalPlanNode;
     use crate::serde::{AsExecutionPlan, BallistaCodec};
     use datafusion_proto::protobuf::LogicalPlanNode;
+    use parking_lot::RwLock;
 
     use super::super::super::error::Result;
     use super::super::protobuf;
@@ -1569,6 +1576,7 @@ mod roundtrip_tests {
     #[test]
     fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
         let scan_config = FileScanConfig {
+            config_options: Arc::new(RwLock::new(ConfigOptions::new())), // 
TODO add serde
             object_store_url: ObjectStoreUrl::local_filesystem(),
             file_schema: Arc::new(Schema::new(vec![Field::new(
                 "col",
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 33047992..436a0c45 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -30,7 +30,7 @@ use datafusion::execution::context::{
     QueryPlanner, SessionConfig, SessionContext, SessionState,
 };
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
 use datafusion::physical_plan::aggregates::AggregateExec;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -38,7 +38,7 @@ use datafusion::physical_plan::common::batch_byte_size;
 use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
 use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::hash_join::HashJoinExec;
+use datafusion::physical_plan::joins::HashJoinExec;
 use datafusion::physical_plan::metrics::MetricsSet;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index bf09d2be..6db40c6f 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -35,15 +35,15 @@ default = ["mimalloc"]
 
 [dependencies]
 anyhow = "1"
-arrow = { version = "24.0.0" }
-arrow-flight = { version = "24.0.0" }
+arrow = { version = "25.0.0" }
+arrow-flight = { version = "25.0.0" }
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.9.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = "13.0.0"
-datafusion-proto = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
 futures = "0.3"
 hyper = "0.14.4"
 log = "0.4"
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index ea996c31..82bc79d0 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -38,7 +38,7 @@ sled = ["sled_package", "tokio-stream"]
 
 [dependencies]
 anyhow = "1"
-arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "25.0.0", features = ["flight-sql-experimental"] }
 async-recursion = "1.0.0"
 async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.9.0" }
@@ -46,8 +46,8 @@ base64 = { version = "0.13", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
 dashmap = "5.4.0"
-datafusion = "13.0.0"
-datafusion-proto = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
 etcd-client = { version = "0.10", optional = true }
 flatbuffers = { version = "22.9.29" }
 futures = "0.3"
diff --git a/ballista/scheduler/src/display.rs 
b/ballista/scheduler/src/display.rs
index 23753889..7df23c22 100644
--- a/ballista/scheduler/src/display.rs
+++ b/ballista/scheduler/src/display.rs
@@ -20,7 +20,7 @@
 //! format
 
 use ballista_core::utils::collect_plan_metrics;
-use datafusion::logical_plan::{StringifiedPlan, ToStringifiedPlan};
+use datafusion::logical_expr::{StringifiedPlan, ToStringifiedPlan};
 use datafusion::physical_plan::metrics::MetricsSet;
 use datafusion::physical_plan::{
     accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor,
@@ -153,7 +153,7 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> 
{
 impl<'a> ToStringifiedPlan for DisplayableBallistaExecutionPlan<'a> {
     fn to_stringified(
         &self,
-        plan_type: datafusion::logical_plan::PlanType,
+        plan_type: datafusion::logical_expr::PlanType,
     ) -> StringifiedPlan {
         StringifiedPlan::new(plan_type, self.indent().to_string())
     }
diff --git a/ballista/scheduler/src/planner.rs 
b/ballista/scheduler/src/planner.rs
index c91edd34..4265d5bd 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -326,7 +326,7 @@ mod test {
     use ballista_core::serde::{protobuf, AsExecutionPlan, BallistaCodec};
     use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
     use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
-    use datafusion::physical_plan::hash_join::HashJoinExec;
+    use datafusion::physical_plan::joins::HashJoinExec;
     use datafusion::physical_plan::sorts::sort::SortExec;
     use datafusion::physical_plan::{
         coalesce_partitions::CoalescePartitionsExec, 
projection::ProjectionExec,
diff --git a/ballista/scheduler/src/scheduler_server/event.rs 
b/ballista/scheduler/src/scheduler_server/event.rs
index f206594f..544976cd 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -17,7 +17,7 @@
 
 use crate::state::executor_manager::ExecutorReservation;
 
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
 
 use crate::state::execution_graph::RunningTaskInfo;
 use ballista_core::serde::protobuf::TaskStatus;
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index 44fed7db..661725af 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -26,7 +26,7 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use ballista_core::utils::default_session_builder;
 
 use datafusion::execution::context::SessionState;
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
 use datafusion::prelude::{SessionConfig, SessionContext};
 use datafusion_proto::logical_plan::AsLogicalPlan;
 
@@ -303,7 +303,7 @@ mod test {
     use std::time::Duration;
 
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
-    use datafusion::logical_plan::{col, sum, LogicalPlan};
+    use datafusion::logical_expr::{col, sum, LogicalPlan};
 
     use datafusion::test_util::scan_empty;
     use datafusion_proto::protobuf::LogicalPlanNode;
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index 5702bed1..dcb919b1 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -1581,8 +1581,8 @@ mod test {
     use std::sync::Arc;
 
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
+    use datafusion::logical_expr::JoinType;
     use datafusion::logical_expr::{col, count, sum, Expr};
-    use datafusion::logical_plan::JoinType;
     use datafusion::physical_plan::display::DisplayableExecutionPlan;
     use datafusion::prelude::{SessionConfig, SessionContext};
     use datafusion::test_util::scan_empty;
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs 
b/ballista/scheduler/src/state/execution_graph_dot.rs
index d46e93f8..0cdbb39c 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -25,12 +25,12 @@ use datafusion::datasource::listing::PartitionedFile;
 use datafusion::physical_plan::aggregates::AggregateExec;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::cross_join::CrossJoinExec;
 use datafusion::physical_plan::file_format::{
     AvroExec, CsvExec, FileScanConfig, NdJsonExec, ParquetExec,
 };
 use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::hash_join::HashJoinExec;
+use datafusion::physical_plan::joins::CrossJoinExec;
+use datafusion::physical_plan::joins::HashJoinExec;
 use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::projection::ProjectionExec;
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index cb723c9a..de20dcdd 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -36,7 +36,7 @@ use crate::state::execution_graph::TaskDescription;
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 9c66c1bc..135a334c 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.9.0" }
-datafusion = "13.0.0"
-datafusion-proto = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 68d350be..c262c3b6 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -28,7 +28,7 @@ use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionState;
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
 use datafusion::parquet::basic::Compression;
 use datafusion::parquet::file::properties::WriterProperties;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
@@ -980,8 +980,7 @@ mod tests {
     use super::*;
     use datafusion::arrow::array::*;
     use datafusion::arrow::util::display::array_value_to_string;
-    use datafusion::logical_plan::Expr;
-    use datafusion::logical_plan::Expr::Cast;
+    use datafusion::logical_expr::{expr::Cast, Expr};
     use std::env;
     use std::sync::Arc;
 
@@ -1469,10 +1468,10 @@ mod tests {
                     .iter()
                     .map(|field| {
                         Expr::Alias(
-                            Box::new(Cast {
+                            Box::new(Expr::Cast(Cast {
                                 expr: Box::new(trim(col(Field::name(field)))),
                                 data_type: Field::data_type(field).to_owned(),
-                            }),
+                            })),
                             Field::name(field).to_string(),
                         )
                     })
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 2a1c6871..0f551528 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/client", version = "0.9.0" }
-datafusion = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 1136f762..2c0d67c8 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
 [dependencies]
 async-trait = "0.1"
 ballista = { path = "../ballista/client", version = "0.9.0" }
-datafusion = { version = "13.0.0", features = ["pyarrow"] }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567", features = ["pyarrow"] }
 futures = "0.3"
 mimalloc = { version = "*", optional = true, default-features = false }
 pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", 
"abi3-py37"] }
diff --git a/python/ballista/tests/test_catalog.py 
b/python/ballista/tests/test_catalog.py
index e53dd7ac..d9043856 100644
--- a/python/ballista/tests/test_catalog.py
+++ b/python/ballista/tests/test_catalog.py
@@ -65,8 +65,8 @@ def test_basic(ctx, database):
     assert table.kind == "physical"
     assert table.schema == pa.schema(
         [
-            pa.field("int", pa.int64(), nullable=False),
-            pa.field("str", pa.string(), nullable=False),
-            pa.field("float", pa.float64(), nullable=False),
+            pa.field("int", pa.int64(), nullable=True),
+            pa.field("str", pa.string(), nullable=True),
+            pa.field("float", pa.float64(), nullable=True),
         ]
     )
diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs
index bc1a9a68..3ee7680c 100644
--- a/python/src/dataframe.rs
+++ b/python/src/dataframe.rs
@@ -21,7 +21,7 @@ use datafusion::arrow::datatypes::Schema;
 use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowException, 
PyArrowType};
 use datafusion::arrow::util::pretty;
 use datafusion::dataframe::DataFrame;
-use datafusion::logical_plan::JoinType;
+use datafusion::logical_expr::JoinType;
 use pyo3::exceptions::PyTypeError;
 use pyo3::prelude::*;
 use pyo3::types::PyTuple;
diff --git a/python/src/dataset_exec.rs b/python/src/dataset_exec.rs
index 987b7ad1..f238d54a 100644
--- a/python/src/dataset_exec.rs
+++ b/python/src/dataset_exec.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::optimizer::utils::conjunction;
 /// Implements a Datafusion physical ExecutionPlan that delegates to a PyArrow 
Dataset
 /// This actually performs the projection, filtering and scanning of a Dataset
 use pyo3::prelude::*;
@@ -32,7 +33,7 @@ use datafusion::arrow::pyarrow::PyArrowType;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError as InnerDataFusionError, Result as 
DFResult};
 use datafusion::execution::context::TaskContext;
-use datafusion::logical_plan::{combine_filters, Expr};
+use datafusion::logical_expr::Expr;
 use datafusion::physical_expr::PhysicalSortExpr;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::{
@@ -93,9 +94,9 @@ impl DatasetExec {
                 .collect()
         });
         let columns: Option<Vec<String>> = columns.transpose()?;
-        let filter_expr: Option<PyObject> = combine_filters(filters)
-            .map(|filters| {
-                PyArrowFilterExpression::try_from(&filters)
+        let filter_expr: Option<PyObject> = 
conjunction(filters.iter().cloned())
+            .map(|filter| {
+                PyArrowFilterExpression::try_from(&filter)
                     .map(|filter_expr| filter_expr.inner().clone_ref(py))
             })
             .transpose()?;
diff --git a/python/src/expression.rs b/python/src/expression.rs
index aa6e540c..03c32fe4 100644
--- a/python/src/expression.rs
+++ b/python/src/expression.rs
@@ -15,12 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::logical_expr::expr::Cast;
 use pyo3::{basic::CompareOp, prelude::*};
 use std::convert::{From, Into};
 
 use datafusion::arrow::datatypes::DataType;
 use datafusion::arrow::pyarrow::PyArrowType;
-use datafusion::logical_plan::{col, lit, Expr};
+use datafusion::logical_expr::{col, lit, Expr, GetIndexedField};
 use datafusion::scalar::ScalarValue;
 
 /// An PyExpr that can be used on a DataFrame
@@ -93,10 +94,10 @@ impl PyExpr {
     }
 
     fn __getitem__(&self, key: &str) -> PyResult<PyExpr> {
-        Ok(Expr::GetIndexedField {
+        Ok(Expr::GetIndexedField(GetIndexedField {
             expr: Box::new(self.expr.clone()),
             key: ScalarValue::Utf8(Some(key.to_string())),
-        }
+        })
         .into())
     }
 
@@ -128,10 +129,10 @@ impl PyExpr {
     pub fn cast(&self, to: PyArrowType<DataType>) -> PyExpr {
         // self.expr.cast_to() requires DFSchema to validate that the cast
         // is supported, omit that for now
-        let expr = Expr::Cast {
+        let expr = Expr::Cast(Cast {
             expr: Box::new(self.expr.clone()),
             data_type: to.0,
-        };
+        });
         expr.into()
     }
 }
diff --git a/python/src/functions.rs b/python/src/functions.rs
index 2d0550d2..be6fff25 100644
--- a/python/src/functions.rs
+++ b/python/src/functions.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::prelude::lit;
 use pyo3::{prelude::*, wrap_pyfunction};
 
 use datafusion::logical_expr::{self, BuiltinScalarFunction, WindowFunction};
@@ -80,7 +81,7 @@ fn concat(args: Vec<PyExpr>) -> PyResult<PyExpr> {
 #[pyfunction(sep, args = "*")]
 fn concat_ws(sep: String, args: Vec<PyExpr>) -> PyResult<PyExpr> {
     let args = args.into_iter().map(|e| e.expr).collect::<Vec<_>>();
-    Ok(logical_expr::concat_ws(sep, &args).into())
+    Ok(logical_expr::concat_ws(lit(sep), args).into())
 }
 
 /// Creates a new Sort expression
diff --git a/python/src/pyarrow_filter_expression.rs 
b/python/src/pyarrow_filter_expression.rs
index cbe2b801..99047c14 100644
--- a/python/src/pyarrow_filter_expression.rs
+++ b/python/src/pyarrow_filter_expression.rs
@@ -150,7 +150,7 @@ impl TryFrom<&Expr> for PyArrowFilterExpression {
                         v
                     ))),
                 },
-                Expr::BinaryExpr { left, op, right } => {
+                Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
                     let operator = operator_to_py(op, op_module)?;
                     let left = 
PyArrowFilterExpression::try_from(left.as_ref())?.0;
                     let right = 
PyArrowFilterExpression::try_from(right.as_ref())?.0;
@@ -173,12 +173,12 @@ impl TryFrom<&Expr> for PyArrowFilterExpression {
                         .into_ref(py);
                     Ok(expr.call_method1("is_null", (expr,))?)
                 }
-                Expr::Between {
+                Expr::Between(Between {
                     expr,
                     negated,
                     low,
                     high,
-                } => {
+                }) => {
                     let expr = 
PyArrowFilterExpression::try_from(expr.as_ref())?.0;
                     let low = 
PyArrowFilterExpression::try_from(low.as_ref())?.0;
                     let high = 
PyArrowFilterExpression::try_from(high.as_ref())?.0;


Reply via email to