This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 60182c80 feat: Scheduler supports `substrait` logical plan and remove
deprecated `sql` support (#1360)
60182c80 is described below
commit 60182c80c3b8d2ddb84a847333dffc4492a45571
Author: Matt Cuento <[email protected]>
AuthorDate: Tue Jan 6 12:51:04 2026 -0800
feat: Scheduler supports `substrait` logical plan and remove deprecated
`sql` support (#1360)
* chore: Remove deprecated `Sql` field from `ExecuteQueryParams.Query`
reserving field id 2 and field name sql
* feat: Support passing Substrait plans to `ExecuteQuery` API
Linting + using non-reserved field id
Adding test for substrait compatibility with local scheduler instance
add inline expressio to substrait test
protect substrait support under feature
Adding protoc feature to datafusion-substrait package
Removing protoc feature
Bump rust base version
Rebuild Cargo.lock
spacing
spacing
Add cargo clean before test
remove cargo clean
Fix linting for feature off
* chore: Edit ci profile to reduce ./target dir size
---
Cargo.lock | 235 +++++++++++++++++++++++-
Cargo.toml | 4 +
ballista/core/proto/ballista.proto | 5 +-
ballista/core/src/serde/generated/ballista.rs | 7 +-
ballista/scheduler/Cargo.toml | 4 +-
ballista/scheduler/src/scheduler_server/grpc.rs | 137 ++++++++++++--
6 files changed, 366 insertions(+), 26 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 2a9971bd..dd71c0fb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -451,6 +451,17 @@ dependencies = [
"zstd-safe",
]
+[[package]]
+name = "async-recursion"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.113",
+]
+
[[package]]
name = "async-stream"
version = "0.3.6"
@@ -1053,6 +1064,7 @@ dependencies = [
"dashmap",
"datafusion",
"datafusion-proto",
+ "datafusion-substrait",
"futures",
"graphviz-rust",
"http 1.4.0",
@@ -2379,6 +2391,27 @@ dependencies = [
"sqlparser",
]
+[[package]]
+name = "datafusion-substrait"
+version = "51.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2505af06d103a55b4e8ded0c6aeb6c72a771948da939c0bd3f8eee67af475a9c"
+dependencies = [
+ "async-recursion",
+ "async-trait",
+ "chrono",
+ "datafusion",
+ "half",
+ "itertools",
+ "object_store",
+ "pbjson-types",
+ "prost",
+ "substrait",
+ "tokio",
+ "url",
+ "uuid",
+]
+
[[package]]
name = "deranged"
version = "0.5.5"
@@ -2627,6 +2660,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
+[[package]]
+name = "foldhash"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
+
[[package]]
name = "form_urlencoded"
version = "1.2.2"
@@ -2849,7 +2888,7 @@ version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
- "foldhash",
+ "foldhash 0.1.5",
]
[[package]]
@@ -2857,6 +2896,11 @@ name = "hashbrown"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
+dependencies = [
+ "allocator-api2",
+ "equivalent",
+ "foldhash 0.2.0",
+]
[[package]]
name = "heck"
@@ -3872,6 +3916,43 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
+[[package]]
+name = "pbjson"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "898bac3fa00d0ba57a4e8289837e965baa2dee8c3749f3b11d45a64b4223d9c3"
+dependencies = [
+ "base64 0.22.1",
+ "serde",
+]
+
+[[package]]
+name = "pbjson-build"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af22d08a625a2213a78dbb0ffa253318c5c79ce3133d32d296655a7bdfb02095"
+dependencies = [
+ "heck 0.5.0",
+ "itertools",
+ "prost",
+ "prost-types",
+]
+
+[[package]]
+name = "pbjson-types"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8e748e28374f10a330ee3bb9f29b828c0ac79831a32bab65015ad9b661ead526"
+dependencies = [
+ "bytes",
+ "chrono",
+ "pbjson",
+ "pbjson-build",
+ "prost",
+ "prost-build",
+ "serde",
+]
+
[[package]]
name = "percent-encoding"
version = "2.3.2"
@@ -4462,6 +4543,16 @@ version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
+[[package]]
+name = "regress"
+version = "0.10.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2057b2325e68a893284d1538021ab90279adac1139957ca2a74426c6f118fb48"
+dependencies = [
+ "hashbrown 0.16.1",
+ "memchr",
+]
+
[[package]]
name = "relative-path"
version = "1.9.3"
@@ -4705,6 +4796,18 @@ dependencies = [
"windows-sys 0.61.2",
]
+[[package]]
+name = "schemars"
+version = "0.8.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3fbf2ae1b8bc8e02df939598064d22402220cd5bbcca1c76f7d6a310974d5615"
+dependencies = [
+ "dyn-clone",
+ "schemars_derive",
+ "serde",
+ "serde_json",
+]
+
[[package]]
name = "schemars"
version = "0.9.0"
@@ -4729,6 +4832,18 @@ dependencies = [
"serde_json",
]
+[[package]]
+name = "schemars_derive"
+version = "0.8.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "32e265784ad618884abaea0600a9adf15393368d840e0222d101a072f3f7534d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "serde_derive_internals",
+ "syn 2.0.113",
+]
+
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -4763,6 +4878,10 @@ name = "semver"
version = "1.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2"
+dependencies = [
+ "serde",
+ "serde_core",
+]
[[package]]
name = "seq-macro"
@@ -4810,6 +4929,17 @@ dependencies = [
"syn 2.0.113",
]
+[[package]]
+name = "serde_derive_internals"
+version = "0.29.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.113",
+]
+
[[package]]
name = "serde_json"
version = "1.0.148"
@@ -4845,6 +4975,18 @@ dependencies = [
"syn 2.0.113",
]
+[[package]]
+name = "serde_tokenstream"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "64060d864397305347a78851c51588fd283767e7e7589829e8121d65512340f1"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "serde",
+ "syn 2.0.113",
+]
+
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@@ -4888,6 +5030,19 @@ dependencies = [
"syn 2.0.113",
]
+[[package]]
+name = "serde_yaml"
+version = "0.9.34+deprecated"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
+dependencies = [
+ "indexmap 2.12.1",
+ "itoa",
+ "ryu",
+ "serde",
+ "unsafe-libyaml",
+]
+
[[package]]
name = "sha2"
version = "0.10.9"
@@ -5100,6 +5255,31 @@ dependencies = [
"syn 2.0.113",
]
+[[package]]
+name = "substrait"
+version = "0.62.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "62fc4b483a129b9772ccb9c3f7945a472112fdd9140da87f8a4e7f1d44e045d0"
+dependencies = [
+ "heck 0.5.0",
+ "pbjson",
+ "pbjson-build",
+ "pbjson-types",
+ "prettyplease",
+ "prost",
+ "prost-build",
+ "prost-types",
+ "regress",
+ "schemars 0.8.22",
+ "semver",
+ "serde",
+ "serde_json",
+ "serde_yaml",
+ "syn 2.0.113",
+ "typify",
+ "walkdir",
+]
+
[[package]]
name = "subtle"
version = "2.6.1"
@@ -5634,6 +5814,53 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
+[[package]]
+name = "typify"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e6d5bcc6f62eb1fa8aa4098f39b29f93dcb914e17158b76c50360911257aa629"
+dependencies = [
+ "typify-impl",
+ "typify-macro",
+]
+
+[[package]]
+name = "typify-impl"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a1eb359f7ffa4f9ebe947fa11a1b2da054564502968db5f317b7e37693cb2240"
+dependencies = [
+ "heck 0.5.0",
+ "log",
+ "proc-macro2",
+ "quote",
+ "regress",
+ "schemars 0.8.22",
+ "semver",
+ "serde",
+ "serde_json",
+ "syn 2.0.113",
+ "thiserror 2.0.17",
+ "unicode-ident",
+]
+
+[[package]]
+name = "typify-macro"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "911c32f3c8514b048c1b228361bebb5e6d73aeec01696e8cc0e82e2ffef8ab7a"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "schemars 0.8.22",
+ "semver",
+ "serde",
+ "serde_json",
+ "serde_tokenstream",
+ "syn 2.0.113",
+ "typify-impl",
+]
+
[[package]]
name = "ucd-trie"
version = "0.1.7"
@@ -5680,6 +5907,12 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254"
+[[package]]
+name = "unsafe-libyaml"
+version = "0.2.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
+
[[package]]
name = "untrusted"
version = "0.9.0"
diff --git a/Cargo.toml b/Cargo.toml
index a7c4574c..11ce13ae 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -39,6 +39,7 @@ datafusion = "51.0.0"
datafusion-cli = "51.0.0"
datafusion-proto = "51.0.0"
datafusion-proto-common = "51.0.0"
+datafusion-substrait = "51.0.0"
object_store = "0.12"
prost = "0.14"
prost-types = "0.14"
@@ -92,6 +93,9 @@ rpath = false
[profile.ci]
inherits = "dev"
incremental = false
+debug = false
+debug-assertions = false
+strip = "debuginfo"
# ci turns off debug info, etc. for dependencies to allow for smaller binaries
making caching more effective
[profile.ci.package."*"]
diff --git a/ballista/core/proto/ballista.proto
b/ballista/core/proto/ballista.proto
index 48e9487b..34fd4381 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -537,8 +537,11 @@ message UpdateTaskStatusResult {
message ExecuteQueryParams {
oneof query {
bytes logical_plan = 1;
- string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL
needed use `flight-sql`
+ bytes substrait_plan = 6;
}
+ // reserved after removing deprecated `sql` query type.
+ reserved 2;
+ reserved "sql";
string session_id = 3;
repeated KeyValuePair settings = 4;
diff --git a/ballista/core/src/serde/generated/ballista.rs
b/ballista/core/src/serde/generated/ballista.rs
index f3a77a57..aeccb7d3 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -821,7 +821,7 @@ pub struct ExecuteQueryParams {
/// client and scheduler
#[prost(string, tag = "5")]
pub operation_id: ::prost::alloc::string::String,
- #[prost(oneof = "execute_query_params::Query", tags = "1, 2")]
+ #[prost(oneof = "execute_query_params::Query", tags = "1, 6")]
pub query: ::core::option::Option<execute_query_params::Query>,
}
/// Nested message and enum types in `ExecuteQueryParams`.
@@ -830,9 +830,8 @@ pub mod execute_query_params {
pub enum Query {
#[prost(bytes, tag = "1")]
LogicalPlan(::prost::alloc::vec::Vec<u8>),
- /// I'd suggest to remove this, if SQL needed use `flight-sql`
- #[prost(string, tag = "2")]
- Sql(::prost::alloc::string::String),
+ #[prost(bytes, tag = "6")]
+ SubstraitPlan(::prost::alloc::vec::Vec<u8>),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index 426e8281..d29c7c5c 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -34,7 +34,7 @@ required-features = ["build-binary"]
[features]
build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing",
"ballista-core/build-binary"]
-default = ["build-binary"]
+default = ["build-binary", "substrait"]
# job info can cache stage plans, in some cases where
# task plans can be re-computed, cache behavior may need to be disabled.
disable-stage-plan-cache = []
@@ -42,6 +42,7 @@ graphviz-support = ["dep:graphviz-rust"]
keda-scaler = []
prometheus-metrics = ["prometheus", "once_cell"]
rest-api = []
+substrait = ["dep:datafusion-substrait"]
[dependencies]
arrow-flight = { workspace = true }
@@ -52,6 +53,7 @@ clap = { workspace = true, optional = true }
dashmap = { workspace = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
+datafusion-substrait = { workspace = true, optional = true }
futures = { workspace = true }
graphviz-rust = { version = "0.9", optional = true }
http = "1.1"
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 2860ff1b..4d2148ac 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -37,6 +37,12 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
use log::{debug, error, info, trace, warn};
use std::net::SocketAddr;
+#[cfg(feature = "substrait")]
+use {
+ datafusion_substrait::logical_plan::consumer::from_substrait_plan,
+ datafusion_substrait::serializer::deserialize_bytes,
+};
+
use std::ops::Deref;
use crate::cluster::{bind_task_bias, bind_task_round_robin};
@@ -392,26 +398,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
}
}
}
- Query::Sql(sql) => {
- match session_ctx
- .sql(&sql)
+ #[cfg(not(feature = "substrait"))]
+ Query::SubstraitPlan(_) => {
+ let msg = "Received query type \"Substrait\", enable
\"substrait\" feature to support Substrait plans.".to_string();
+ error!("{msg}");
+ return Ok(Response::new(ExecuteQueryResult {
+ operation_id,
+ result: Some(execute_query_result::Result::Failure(
+ ExecuteQueryFailureResult {
+ failure:
Some(execute_query_failure_result::Failure::PlanParsingFailure(msg)),
+ }
+ ))
+ }));
+ }
+ #[cfg(feature = "substrait")]
+ Query::SubstraitPlan(bytes) => {
+ let plan = deserialize_bytes(bytes).await.map_err(|e| {
+ let msg = format!("Could not parse substrait plan:
{e}");
+ error!("{}", msg);
+ Status::internal(msg)
+ })?;
+
+ let ctx = session_ctx.as_ref().clone();
+ from_substrait_plan(&ctx.state(), &plan)
.await
- .and_then(|df| df.into_optimized_plan())
- {
- Ok(plan) => plan,
- Err(e) => {
- let msg = format!("Error parsing SQL: {e}");
- error!("{msg}");
- return Ok(Response::new(ExecuteQueryResult {
- operation_id,
- result:
Some(execute_query_result::Result::Failure(
- ExecuteQueryFailureResult {
- failure:
Some(execute_query_failure_result::Failure::PlanParsingFailure(msg)),
- },
- )),
- }));
- }
- }
+ .map_err(|e| {
+ let msg = format!("Could not parse substrait plan:
{e}");
+ error!("{}", msg);
+ Status::internal(msg)
+ })?
}
};
@@ -547,6 +562,14 @@ mod test {
use datafusion_proto::protobuf::PhysicalPlanNode;
use tonic::Request;
+ #[cfg(feature = "substrait")]
+ use {
+ ballista_core::serde::protobuf::ExecuteQueryParams,
+ ballista_core::serde::protobuf::execute_query_params::Query,
+ datafusion::prelude::{SessionConfig, SessionContext},
+ datafusion_substrait::serializer::serialize_bytes,
+ };
+
use crate::config::SchedulerConfig;
use crate::metrics::default_metrics_collector;
use ballista_core::error::BallistaError;
@@ -873,4 +896,80 @@ mod test {
assert!(active_executors.is_empty());
Ok(())
}
+ #[tokio::test]
+ #[cfg(feature = "substrait")]
+ async fn test_substrait_compatibility() -> Result<(), BallistaError> {
+ let cluster = test_cluster_context();
+
+ let config = SchedulerConfig::default();
+ let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
+ SchedulerServer::new(
+ "localhost:50050".to_owned(),
+ cluster.clone(),
+ BallistaCodec::default(),
+ Arc::new(config),
+ default_metrics_collector().unwrap(),
+ );
+ scheduler.init().await?;
+
+ let exec_meta = ExecutorRegistration {
+ id: "abc".to_owned(),
+ host: Some("http://localhost:8080".to_owned()),
+ port: 0,
+ grpc_port: 0,
+ specification: Some(ExecutorSpecification { task_slots: 2
}.into()),
+ };
+
+ let request: Request<RegisterExecutorParams> =
+ Request::new(RegisterExecutorParams {
+ metadata: Some(exec_meta.clone()),
+ });
+ let response = scheduler
+ .register_executor(request)
+ .await
+ .expect("Received error response")
+ .into_inner();
+
+ // registration should success
+ assert!(response.success);
+
+ let state = scheduler.state.clone();
+ // executor should be registered
+ let stored_executor = state
+ .executor_manager
+ .get_executor_metadata("abc")
+ .await
+ .expect("getting executor");
+
+ assert_eq!(stored_executor.grpc_port, 0);
+ assert_eq!(stored_executor.port, 0);
+ assert_eq!(stored_executor.specification.task_slots, 2);
+ assert_eq!(stored_executor.host, "http://localhost:8080".to_owned());
+
+ // Context strictly used for values-based query serialization to avoid
+ // needing to register tables and keep them in sync with the scheduler
instance.
+ // We only truly desire to test proper reception of a Substrait plan,
not explicit
+ // SubstraitPlan -> LogicalPlan conversions.
+ let config = SessionConfig::new();
+ let ctx = SessionContext::new_with_config(config);
+ let serialized_substrait_plan = serialize_bytes(
+ "SELECT a, b, ABS(a) + ABS(b) FROM (VALUES (1, 2), (3, 4)) AS t(a,
b)",
+ &ctx,
+ )
+ .await?;
+
+ let execute_query_request = Request::new(ExecuteQueryParams {
+ session_id: uuid::Uuid::new_v4().to_string(),
+ settings: vec![],
+ operation_id: uuid::Uuid::now_v7().to_string(),
+ query: Some(Query::SubstraitPlan(serialized_substrait_plan)),
+ });
+ let response = scheduler.execute_query(execute_query_request).await?;
+ response
+ .into_inner()
+ .result
+ .expect("Received error response");
+
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]