This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 60182c80 feat: Scheduler supports `substrait` logical plan and remove 
deprecated `sql` support (#1360)
60182c80 is described below

commit 60182c80c3b8d2ddb84a847333dffc4492a45571
Author: Matt Cuento <[email protected]>
AuthorDate: Tue Jan 6 12:51:04 2026 -0800

    feat: Scheduler supports `substrait` logical plan and remove deprecated 
`sql` support (#1360)
    
    * chore: Remove deprecated `Sql` field from `ExecuteQueryParams.Query`
    
    reserving field id 2 and field name sql
    
    * feat: Support passing Substrait plans to `ExecuteQuery` API
    
    Linting + using non-reserved field id
    
    Adding test for substrait compatibility with local scheduler instance
    
    add inline expressio to substrait test
    
    protect substrait support under feature
    
    Adding protoc feature to datafusion-substrait package
    
    Removing protoc feature
    
    Bump rust base version
    
    Rebuild Cargo.lock
    
    spacing
    
    spacing
    
    Add cargo clean before test
    
    remove cargo clean
    
    Fix linting for feature off
    
    * chore: Edit ci profile to reduce ./target dir size
---
 Cargo.lock                                      | 235 +++++++++++++++++++++++-
 Cargo.toml                                      |   4 +
 ballista/core/proto/ballista.proto              |   5 +-
 ballista/core/src/serde/generated/ballista.rs   |   7 +-
 ballista/scheduler/Cargo.toml                   |   4 +-
 ballista/scheduler/src/scheduler_server/grpc.rs | 137 ++++++++++++--
 6 files changed, 366 insertions(+), 26 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 2a9971bd..dd71c0fb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -451,6 +451,17 @@ dependencies = [
  "zstd-safe",
 ]
 
+[[package]]
+name = "async-recursion"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.113",
+]
+
 [[package]]
 name = "async-stream"
 version = "0.3.6"
@@ -1053,6 +1064,7 @@ dependencies = [
  "dashmap",
  "datafusion",
  "datafusion-proto",
+ "datafusion-substrait",
  "futures",
  "graphviz-rust",
  "http 1.4.0",
@@ -2379,6 +2391,27 @@ dependencies = [
  "sqlparser",
 ]
 
+[[package]]
+name = "datafusion-substrait"
+version = "51.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2505af06d103a55b4e8ded0c6aeb6c72a771948da939c0bd3f8eee67af475a9c"
+dependencies = [
+ "async-recursion",
+ "async-trait",
+ "chrono",
+ "datafusion",
+ "half",
+ "itertools",
+ "object_store",
+ "pbjson-types",
+ "prost",
+ "substrait",
+ "tokio",
+ "url",
+ "uuid",
+]
+
 [[package]]
 name = "deranged"
 version = "0.5.5"
@@ -2627,6 +2660,12 @@ version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
 
+[[package]]
+name = "foldhash"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
+
 [[package]]
 name = "form_urlencoded"
 version = "1.2.2"
@@ -2849,7 +2888,7 @@ version = "0.15.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
 dependencies = [
- "foldhash",
+ "foldhash 0.1.5",
 ]
 
 [[package]]
@@ -2857,6 +2896,11 @@ name = "hashbrown"
 version = "0.16.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
+dependencies = [
+ "allocator-api2",
+ "equivalent",
+ "foldhash 0.2.0",
+]
 
 [[package]]
 name = "heck"
@@ -3872,6 +3916,43 @@ version = "1.0.15"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
 
+[[package]]
+name = "pbjson"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "898bac3fa00d0ba57a4e8289837e965baa2dee8c3749f3b11d45a64b4223d9c3"
+dependencies = [
+ "base64 0.22.1",
+ "serde",
+]
+
+[[package]]
+name = "pbjson-build"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "af22d08a625a2213a78dbb0ffa253318c5c79ce3133d32d296655a7bdfb02095"
+dependencies = [
+ "heck 0.5.0",
+ "itertools",
+ "prost",
+ "prost-types",
+]
+
+[[package]]
+name = "pbjson-types"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8e748e28374f10a330ee3bb9f29b828c0ac79831a32bab65015ad9b661ead526"
+dependencies = [
+ "bytes",
+ "chrono",
+ "pbjson",
+ "pbjson-build",
+ "prost",
+ "prost-build",
+ "serde",
+]
+
 [[package]]
 name = "percent-encoding"
 version = "2.3.2"
@@ -4462,6 +4543,16 @@ version = "0.8.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
 
+[[package]]
+name = "regress"
+version = "0.10.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2057b2325e68a893284d1538021ab90279adac1139957ca2a74426c6f118fb48"
+dependencies = [
+ "hashbrown 0.16.1",
+ "memchr",
+]
+
 [[package]]
 name = "relative-path"
 version = "1.9.3"
@@ -4705,6 +4796,18 @@ dependencies = [
  "windows-sys 0.61.2",
 ]
 
+[[package]]
+name = "schemars"
+version = "0.8.22"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615"
+dependencies = [
+ "dyn-clone",
+ "schemars_derive",
+ "serde",
+ "serde_json",
+]
+
 [[package]]
 name = "schemars"
 version = "0.9.0"
@@ -4729,6 +4832,18 @@ dependencies = [
  "serde_json",
 ]
 
+[[package]]
+name = "schemars_derive"
+version = "0.8.22"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "32e265784ad618884abaea0600a9adf15393368d840e0222d101a072f3f7534d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "serde_derive_internals",
+ "syn 2.0.113",
+]
+
 [[package]]
 name = "scopeguard"
 version = "1.2.0"
@@ -4763,6 +4878,10 @@ name = "semver"
 version = "1.0.27"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2"
+dependencies = [
+ "serde",
+ "serde_core",
+]
 
 [[package]]
 name = "seq-macro"
@@ -4810,6 +4929,17 @@ dependencies = [
  "syn 2.0.113",
 ]
 
+[[package]]
+name = "serde_derive_internals"
+version = "0.29.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.113",
+]
+
 [[package]]
 name = "serde_json"
 version = "1.0.148"
@@ -4845,6 +4975,18 @@ dependencies = [
  "syn 2.0.113",
 ]
 
+[[package]]
+name = "serde_tokenstream"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "64060d864397305347a78851c51588fd283767e7e7589829e8121d65512340f1"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "serde",
+ "syn 2.0.113",
+]
+
 [[package]]
 name = "serde_urlencoded"
 version = "0.7.1"
@@ -4888,6 +5030,19 @@ dependencies = [
  "syn 2.0.113",
 ]
 
+[[package]]
+name = "serde_yaml"
+version = "0.9.34+deprecated"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
+dependencies = [
+ "indexmap 2.12.1",
+ "itoa",
+ "ryu",
+ "serde",
+ "unsafe-libyaml",
+]
+
 [[package]]
 name = "sha2"
 version = "0.10.9"
@@ -5100,6 +5255,31 @@ dependencies = [
  "syn 2.0.113",
 ]
 
+[[package]]
+name = "substrait"
+version = "0.62.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "62fc4b483a129b9772ccb9c3f7945a472112fdd9140da87f8a4e7f1d44e045d0"
+dependencies = [
+ "heck 0.5.0",
+ "pbjson",
+ "pbjson-build",
+ "pbjson-types",
+ "prettyplease",
+ "prost",
+ "prost-build",
+ "prost-types",
+ "regress",
+ "schemars 0.8.22",
+ "semver",
+ "serde",
+ "serde_json",
+ "serde_yaml",
+ "syn 2.0.113",
+ "typify",
+ "walkdir",
+]
+
 [[package]]
 name = "subtle"
 version = "2.6.1"
@@ -5634,6 +5814,53 @@ version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
 
+[[package]]
+name = "typify"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e6d5bcc6f62eb1fa8aa4098f39b29f93dcb914e17158b76c50360911257aa629"
+dependencies = [
+ "typify-impl",
+ "typify-macro",
+]
+
+[[package]]
+name = "typify-impl"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a1eb359f7ffa4f9ebe947fa11a1b2da054564502968db5f317b7e37693cb2240"
+dependencies = [
+ "heck 0.5.0",
+ "log",
+ "proc-macro2",
+ "quote",
+ "regress",
+ "schemars 0.8.22",
+ "semver",
+ "serde",
+ "serde_json",
+ "syn 2.0.113",
+ "thiserror 2.0.17",
+ "unicode-ident",
+]
+
+[[package]]
+name = "typify-macro"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "911c32f3c8514b048c1b228361bebb5e6d73aeec01696e8cc0e82e2ffef8ab7a"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "schemars 0.8.22",
+ "semver",
+ "serde",
+ "serde_json",
+ "serde_tokenstream",
+ "syn 2.0.113",
+ "typify-impl",
+]
+
 [[package]]
 name = "ucd-trie"
 version = "0.1.7"
@@ -5680,6 +5907,12 @@ version = "0.2.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254"
 
+[[package]]
+name = "unsafe-libyaml"
+version = "0.2.11"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
+
 [[package]]
 name = "untrusted"
 version = "0.9.0"
diff --git a/Cargo.toml b/Cargo.toml
index a7c4574c..11ce13ae 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -39,6 +39,7 @@ datafusion = "51.0.0"
 datafusion-cli = "51.0.0"
 datafusion-proto = "51.0.0"
 datafusion-proto-common = "51.0.0"
+datafusion-substrait = "51.0.0"
 object_store = "0.12"
 prost = "0.14"
 prost-types = "0.14"
@@ -92,6 +93,9 @@ rpath = false
 [profile.ci]
 inherits = "dev"
 incremental = false
+debug = false
+debug-assertions = false
+strip = "debuginfo"
 
 # ci turns off debug info, etc. for dependencies to allow for smaller binaries 
making caching more effective
 [profile.ci.package."*"]
diff --git a/ballista/core/proto/ballista.proto 
b/ballista/core/proto/ballista.proto
index 48e9487b..34fd4381 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -537,8 +537,11 @@ message UpdateTaskStatusResult {
 message ExecuteQueryParams {
   oneof query {
     bytes logical_plan = 1;
-    string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL 
needed use `flight-sql`
+    bytes substrait_plan = 6;
   }
+  // reserved after removing deprecated `sql` query type.
+  reserved 2;
+  reserved "sql";
 
   string session_id = 3;
   repeated KeyValuePair settings = 4;
diff --git a/ballista/core/src/serde/generated/ballista.rs 
b/ballista/core/src/serde/generated/ballista.rs
index f3a77a57..aeccb7d3 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -821,7 +821,7 @@ pub struct ExecuteQueryParams {
     /// client and scheduler
     #[prost(string, tag = "5")]
     pub operation_id: ::prost::alloc::string::String,
-    #[prost(oneof = "execute_query_params::Query", tags = "1, 2")]
+    #[prost(oneof = "execute_query_params::Query", tags = "1, 6")]
     pub query: ::core::option::Option<execute_query_params::Query>,
 }
 /// Nested message and enum types in `ExecuteQueryParams`.
@@ -830,9 +830,8 @@ pub mod execute_query_params {
     pub enum Query {
         #[prost(bytes, tag = "1")]
         LogicalPlan(::prost::alloc::vec::Vec<u8>),
-        /// I'd suggest to remove this, if SQL needed use `flight-sql`
-        #[prost(string, tag = "2")]
-        Sql(::prost::alloc::string::String),
+        #[prost(bytes, tag = "6")]
+        SubstraitPlan(::prost::alloc::vec::Vec<u8>),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 426e8281..d29c7c5c 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -34,7 +34,7 @@ required-features = ["build-binary"]
 
 [features]
 build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", 
"ballista-core/build-binary"]
-default = ["build-binary"]
+default = ["build-binary", "substrait"]
 # job info can cache stage plans, in some cases where 
 # task plans can be re-computed, cache behavior may need to be disabled.
 disable-stage-plan-cache = []
@@ -42,6 +42,7 @@ graphviz-support = ["dep:graphviz-rust"]
 keda-scaler = []
 prometheus-metrics = ["prometheus", "once_cell"]
 rest-api = []
+substrait = ["dep:datafusion-substrait"]
 
 [dependencies]
 arrow-flight = { workspace = true }
@@ -52,6 +53,7 @@ clap = { workspace = true, optional = true }
 dashmap = { workspace = true }
 datafusion = { workspace = true }
 datafusion-proto = { workspace = true }
+datafusion-substrait = { workspace = true, optional = true }
 futures = { workspace = true }
 graphviz-rust = { version = "0.9", optional = true }
 http = "1.1"
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 2860ff1b..4d2148ac 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -37,6 +37,12 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
 use log::{debug, error, info, trace, warn};
 use std::net::SocketAddr;
 
+#[cfg(feature = "substrait")]
+use {
+    datafusion_substrait::logical_plan::consumer::from_substrait_plan,
+    datafusion_substrait::serializer::deserialize_bytes,
+};
+
 use std::ops::Deref;
 
 use crate::cluster::{bind_task_bias, bind_task_round_robin};
@@ -392,26 +398,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                         }
                     }
                 }
-                Query::Sql(sql) => {
-                    match session_ctx
-                        .sql(&sql)
+                #[cfg(not(feature = "substrait"))]
+                Query::SubstraitPlan(_) => {
+                    let msg = "Received query type \"Substrait\", enable 
\"substrait\" feature to support Substrait plans.".to_string();
+                    error!("{msg}");
+                    return Ok(Response::new(ExecuteQueryResult {
+                        operation_id,
+                        result: Some(execute_query_result::Result::Failure(
+                            ExecuteQueryFailureResult {
+                                failure: 
Some(execute_query_failure_result::Failure::PlanParsingFailure(msg)),
+                            }
+                        ))
+                    }));
+                }
+                #[cfg(feature = "substrait")]
+                Query::SubstraitPlan(bytes) => {
+                    let plan = deserialize_bytes(bytes).await.map_err(|e| {
+                        let msg = format!("Could not parse substrait plan: 
{e}");
+                        error!("{}", msg);
+                        Status::internal(msg)
+                    })?;
+
+                    let ctx = session_ctx.as_ref().clone();
+                    from_substrait_plan(&ctx.state(), &plan)
                         .await
-                        .and_then(|df| df.into_optimized_plan())
-                    {
-                        Ok(plan) => plan,
-                        Err(e) => {
-                            let msg = format!("Error parsing SQL: {e}");
-                            error!("{msg}");
-                            return Ok(Response::new(ExecuteQueryResult {
-                                operation_id,
-                                result: 
Some(execute_query_result::Result::Failure(
-                                    ExecuteQueryFailureResult {
-                                        failure: 
Some(execute_query_failure_result::Failure::PlanParsingFailure(msg)),
-                                    },
-                                )),
-                            }));
-                        }
-                    }
+                        .map_err(|e| {
+                            let msg = format!("Could not parse substrait plan: 
{e}");
+                            error!("{}", msg);
+                            Status::internal(msg)
+                        })?
                 }
             };
 
@@ -547,6 +562,14 @@ mod test {
     use datafusion_proto::protobuf::PhysicalPlanNode;
     use tonic::Request;
 
+    #[cfg(feature = "substrait")]
+    use {
+        ballista_core::serde::protobuf::ExecuteQueryParams,
+        ballista_core::serde::protobuf::execute_query_params::Query,
+        datafusion::prelude::{SessionConfig, SessionContext},
+        datafusion_substrait::serializer::serialize_bytes,
+    };
+
     use crate::config::SchedulerConfig;
     use crate::metrics::default_metrics_collector;
     use ballista_core::error::BallistaError;
@@ -873,4 +896,80 @@ mod test {
         assert!(active_executors.is_empty());
         Ok(())
     }
+    #[tokio::test]
+    #[cfg(feature = "substrait")]
+    async fn test_substrait_compatibility() -> Result<(), BallistaError> {
+        let cluster = test_cluster_context();
+
+        let config = SchedulerConfig::default();
+        let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
+            SchedulerServer::new(
+                "localhost:50050".to_owned(),
+                cluster.clone(),
+                BallistaCodec::default(),
+                Arc::new(config),
+                default_metrics_collector().unwrap(),
+            );
+        scheduler.init().await?;
+
+        let exec_meta = ExecutorRegistration {
+            id: "abc".to_owned(),
+            host: Some("http://localhost:8080".to_owned()),
+            port: 0,
+            grpc_port: 0,
+            specification: Some(ExecutorSpecification { task_slots: 2 
}.into()),
+        };
+
+        let request: Request<RegisterExecutorParams> =
+            Request::new(RegisterExecutorParams {
+                metadata: Some(exec_meta.clone()),
+            });
+        let response = scheduler
+            .register_executor(request)
+            .await
+            .expect("Received error response")
+            .into_inner();
+
+        // registration should success
+        assert!(response.success);
+
+        let state = scheduler.state.clone();
+        // executor should be registered
+        let stored_executor = state
+            .executor_manager
+            .get_executor_metadata("abc")
+            .await
+            .expect("getting executor");
+
+        assert_eq!(stored_executor.grpc_port, 0);
+        assert_eq!(stored_executor.port, 0);
+        assert_eq!(stored_executor.specification.task_slots, 2);
+        assert_eq!(stored_executor.host, "http://localhost:8080".to_owned());
+
+        // Context strictly used for values-based query serialization to avoid
+        // needing to register tables and keep them in sync with the scheduler 
instance.
+        // We only truly desire to test proper reception of a Substrait plan, 
not explicit
+        // SubstraitPlan -> LogicalPlan conversions.
+        let config = SessionConfig::new();
+        let ctx = SessionContext::new_with_config(config);
+        let serialized_substrait_plan = serialize_bytes(
+            "SELECT a, b, ABS(a) + ABS(b) FROM (VALUES (1, 2), (3, 4)) AS t(a, 
b)",
+            &ctx,
+        )
+        .await?;
+
+        let execute_query_request = Request::new(ExecuteQueryParams {
+            session_id: uuid::Uuid::new_v4().to_string(),
+            settings: vec![],
+            operation_id: uuid::Uuid::now_v7().to_string(),
+            query: Some(Query::SubstraitPlan(serialized_substrait_plan)),
+        });
+        let response = scheduler.execute_query(execute_query_request).await?;
+        response
+            .into_inner()
+            .result
+            .expect("Received error response");
+
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to