This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new abeec01c update latest datafusion. (#175)
abeec01c is described below
commit abeec01cf6040c1f097a77d8d0ea99280b2be68e
Author: Yang Jiang <[email protected]>
AuthorDate: Wed Aug 31 00:20:31 2022 +0800
update latest datafusion. (#175)
---
.github/workflows/python_build.yml | 11 ++++++
.github/workflows/python_test.yaml | 13 ++++++
.github/workflows/rust.yml | 46 ++++++++++++++++++++++
ballista-cli/Cargo.toml | 4 +-
ballista/rust/client/Cargo.toml | 6 +--
ballista/rust/core/Cargo.toml | 18 ++++-----
ballista/rust/core/proto/datafusion.proto | 8 +++-
.../core/src/execution_plans/shuffle_writer.rs | 12 +++---
.../core/src/serde/physical_plan/from_proto.rs | 5 +--
ballista/rust/core/src/serde/physical_plan/mod.rs | 7 +++-
ballista/rust/core/src/serde/scheduler/mod.rs | 14 +++----
ballista/rust/executor/Cargo.toml | 10 ++---
ballista/rust/scheduler/Cargo.toml | 14 ++++---
ballista/rust/scheduler/src/lib.rs | 1 +
ballista/rust/scheduler/src/main.rs | 20 ++++++----
benchmarks/Cargo.toml | 4 +-
examples/Cargo.toml | 6 +--
python/Cargo.toml | 2 +-
python/src/functions.rs | 2 +-
python/src/udaf.rs | 12 ++++--
20 files changed, 153 insertions(+), 62 deletions(-)
diff --git a/.github/workflows/python_build.yml
b/.github/workflows/python_build.yml
index 45f3adde..89b101bc 100644
--- a/.github/workflows/python_build.yml
+++ b/.github/workflows/python_build.yml
@@ -94,6 +94,16 @@ jobs:
steps:
- uses: actions/checkout@v2
- run: rm LICENSE.txt
+ - name: Install protobuf compiler
+ shell: bash
+ run: |
+ mkdir -p $HOME/d/protoc
+ cd $HOME/d/protoc
+ export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
+ curl -LO
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+ unzip $PROTO_ZIP
+ export PATH=$PATH:$HOME/d/protoc/bin
+ protoc --version
- name: Download LICENSE.txt
uses: actions/download-artifact@v2
with:
@@ -102,6 +112,7 @@ jobs:
- run: cat LICENSE.txt
- name: Build wheels
run: |
+ export PATH=$PATH:$HOME/d/protoc/bin
export RUSTFLAGS='-C target-cpu=skylake'
docker run --rm -v $(pwd)/..:/io \
--workdir /io/python \
diff --git a/.github/workflows/python_test.yaml
b/.github/workflows/python_test.yaml
index f3768e1e..73302da8 100644
--- a/.github/workflows/python_test.yaml
+++ b/.github/workflows/python_test.yaml
@@ -23,6 +23,16 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
+ - name: Install protobuf compiler
+ shell: bash
+ run: |
+ mkdir -p $HOME/d/protoc
+ cd $HOME/d/protoc
+ export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
+ curl -LO
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+ unzip $PROTO_ZIP
+ export PATH=$PATH:$HOME/d/protoc/bin
+ protoc --version
- name: Setup Rust toolchain
run: |
rustup component add rustfmt
@@ -41,6 +51,7 @@ jobs:
python-version: "3.10"
- name: Create Virtualenv
run: |
+ export PATH=$PATH:$HOME/d/protoc/bin
python -m venv venv
source venv/bin/activate
pip install -r python/requirements.txt
@@ -51,6 +62,8 @@ jobs:
black --line-length 79 --diff --check python
- name: Run tests
run: |
+ export PATH=$PATH:$HOME/d/protoc/bin
+ protoc --version
source venv/bin/activate
cd python
maturin develop
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index e8ffab29..b6f9ac73 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -89,6 +89,16 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
+ - name: Install protobuf compiler
+ shell: bash
+ run: |
+ mkdir -p $HOME/d/protoc
+ cd $HOME/d/protoc
+ export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
+ curl -LO
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+ unzip $PROTO_ZIP
+ export PATH=$PATH:$HOME/d/protoc/bin
+ protoc --version
- name: Cache Cargo
uses: actions/cache@v2
with:
@@ -107,6 +117,7 @@ jobs:
rust-version: ${{ matrix.rust }}
- name: Run tests
run: |
+ export PATH=$PATH:$HOME/d/protoc/bin
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
cargo test
@@ -135,6 +146,16 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
+ - name: Install protobuf compiler
+ shell: bash
+ run: |
+ mkdir -p $HOME/d/protoc
+ cd $HOME/d/protoc
+ export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
+ curl -LO
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+ unzip $PROTO_ZIP
+ export PATH=$PATH:$HOME/d/protoc/bin
+ protoc --version
- name: Cache Cargo
uses: actions/cache@v2
with:
@@ -154,6 +175,7 @@ jobs:
# Ballista is currently not part of the main workspace so requires a
separate test step
- name: Run Ballista tests
run: |
+ export PATH=$PATH:$HOME/d/protoc/bin
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
cd ballista/rust
@@ -176,6 +198,29 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
+ - name: Install protobuf macos compiler
+ shell: bash
+ run: |
+ mkdir -p $HOME/d/protoc
+ cd $HOME/d/protoc
+ export PROTO_ZIP="protoc-21.4-osx-x86_64.zip"
+ curl -LO
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+ unzip $PROTO_ZIP
+ echo "$HOME/d/protoc/bin" >> $GITHUB_PATH
+ export PATH=$PATH:$HOME/d/protoc/bin
+ protoc --version
+ if: ${{matrix.os == 'macos-latest'}}
+ - name: Install protobuf windows compiler
+ shell: bash
+ run: |
+ mkdir -p $HOME/d/protoc
+ cd $HOME/d/protoc
+ export PROTO_ZIP="protoc-21.4-win64.zip"
+ curl -LO
https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
+ unzip $PROTO_ZIP
+ export PATH=$PATH:$HOME/d/protoc/bin
+ protoc.exe --version
+ if: ${{matrix.os == 'windows-latest'}}
# TODO: this won't cache anything, which is expensive. Setup this action
# with a OS-dependent path.
- name: Setup Rust toolchain
@@ -186,6 +231,7 @@ jobs:
- name: Run tests
shell: bash
run: |
+ export PATH=$PATH:$HOME/d/protoc/bin
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
cargo test
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 11c2e772..8fdad9eb 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 9cf4eb33..ff9165b4 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,12 +31,12 @@ rust-version = "1.59"
ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true
}
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional =
true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
-sqlparser = "0.18"
+sqlparser = "0.22"
tempfile = "3"
tokio = "1.0"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 820eb163..c32cac07 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -35,28 +35,28 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }
-arrow-flight = { version = "18.0.0" }
+arrow-flight = { version = "20.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
futures = "0.3"
hashbrown = "0.12"
libloading = "0.7.3"
log = "0.4"
-object_store = "0.3.0"
+object_store = "0.4.0"
once_cell = "1.9.0"
parking_lot = "0.12"
parse_arg = "0.1.3"
-prost = "0.10"
-prost-types = "0.10"
+prost = "0.11"
+prost-types = "0.11"
serde = { version = "1", features = ["derive"] }
-sqlparser = "0.18"
+sqlparser = "0.22"
tokio = "1.0"
-tonic = "0.7"
+tonic = "0.8"
uuid = { version = "1.0", features = ["v4"] }
walkdir = "2.3.2"
@@ -65,4 +65,4 @@ tempfile = "3"
[build-dependencies]
rustc_version = "0.4.0"
-tonic-build = { version = "0.7" }
+tonic-build = { version = "0.8", default-features = false, features =
["transport", "prost"] }
diff --git a/ballista/rust/core/proto/datafusion.proto
b/ballista/rust/core/proto/datafusion.proto
index 09c97029..10d012e9 100644
--- a/ballista/rust/core/proto/datafusion.proto
+++ b/ballista/rust/core/proto/datafusion.proto
@@ -438,6 +438,10 @@ enum ScalarFunction {
Coalesce=63;
Power=64;
StructFun=65;
+ FromUnixtime=66;
+ Atan2=67;
+ DateBin=68;
+ ArrowTypeof=69;
}
message ScalarFunctionNode {
@@ -464,11 +468,13 @@ enum AggregateFunction {
APPROX_MEDIAN=15;
APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
GROUPING = 17;
+ MEDIAN=18;
}
message AggregateExprNode {
AggregateFunction aggr_function = 1;
repeated LogicalExprNode expr = 2;
+ bool distinct = 3;
}
message AggregateUDFExprNode {
@@ -654,7 +660,7 @@ message Union{
}
message ScalarListValue{
- ScalarType datatype = 1;
+ Field field = 1;
repeated ScalarValue values = 2;
}
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 45a10218..a1302111 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -354,11 +354,11 @@ impl ExecutionPlan for ShuffleWriterExec {
let mut num_bytes_builder = UInt64Builder::new(num_writers);
for loc in &part_loc {
- path_builder.append_value(loc.path.clone())?;
- partition_builder.append_value(loc.partition_id as u32)?;
- num_rows_builder.append_value(loc.num_rows)?;
- num_batches_builder.append_value(loc.num_batches)?;
- num_bytes_builder.append_value(loc.num_bytes)?;
+ path_builder.append_value(loc.path.clone());
+ partition_builder.append_value(loc.partition_id as u32);
+ num_rows_builder.append_value(loc.num_rows);
+ num_batches_builder.append_value(loc.num_batches);
+ num_bytes_builder.append_value(loc.num_bytes);
}
// build arrays
@@ -374,7 +374,7 @@ impl ExecutionPlan for ShuffleWriterExec {
field_builders,
);
for _ in 0..num_writers {
- stats_builder.append(true)?;
+ stats_builder.append(true);
}
let stats = Arc::new(stats_builder.finish());
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 6268ab28..6abb5f12 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -68,9 +68,7 @@ pub(crate) fn parse_physical_expr(
let pcol: Column = c.into();
Arc::new(pcol)
}
- ExprType::Literal(scalar) => {
- Arc::new(Literal::new(convert_required!(scalar.value)?))
- }
+ ExprType::Literal(scalar) =>
Arc::new(Literal::new(scalar.try_into()?)),
ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
parse_required_physical_box_expr(
&binary_expr.l,
@@ -312,6 +310,7 @@ impl TryFrom<&protobuf::PartitionedFile> for
PartitionedFile {
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
+ extensions: None,
})
}
}
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index b80e64b1..95627f6d 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -168,6 +168,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok(Arc::new(ParquetExec::new(
decode_scan_config(scan.base_conf.as_ref().unwrap())?,
predicate,
+ None,
)))
}
PhysicalPlanType::AvroScan(scan) => Ok(Arc::new(AvroExec::new(
@@ -1462,7 +1463,11 @@ mod roundtrip_tests {
};
let predicate =
datafusion::prelude::col("col").eq(datafusion::prelude::lit("1"));
- roundtrip_test(Arc::new(ParquetExec::new(scan_config,
Some(predicate))))
+ roundtrip_test(Arc::new(ParquetExec::new(
+ scan_config,
+ Some(predicate),
+ None,
+ )))
}
#[test]
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs
b/ballista/rust/core/src/serde/scheduler/mod.rs
index 38b350d6..6f5a61d5 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -157,28 +157,28 @@ impl PartitionStats {
let mut num_rows_builder = UInt64Builder::new(1);
match self.num_rows {
- Some(n) => num_rows_builder.append_value(n)?,
- None => num_rows_builder.append_null()?,
+ Some(n) => num_rows_builder.append_value(n),
+ None => num_rows_builder.append_null(),
}
field_builders.push(Box::new(num_rows_builder) as Box<dyn
ArrayBuilder>);
let mut num_batches_builder = UInt64Builder::new(1);
match self.num_batches {
- Some(n) => num_batches_builder.append_value(n)?,
- None => num_batches_builder.append_null()?,
+ Some(n) => num_batches_builder.append_value(n),
+ None => num_batches_builder.append_null(),
}
field_builders.push(Box::new(num_batches_builder) as Box<dyn
ArrayBuilder>);
let mut num_bytes_builder = UInt64Builder::new(1);
match self.num_bytes {
- Some(n) => num_bytes_builder.append_value(n)?,
- None => num_bytes_builder.append_null()?,
+ Some(n) => num_bytes_builder.append_value(n),
+ None => num_bytes_builder.append_null(),
}
field_builders.push(Box::new(num_bytes_builder) as Box<dyn
ArrayBuilder>);
let mut struct_builder =
StructBuilder::new(self.arrow_struct_fields(), field_builders);
- struct_builder.append(true)?;
+ struct_builder.append(true);
Ok(Arc::new(struct_builder.finish()))
}
diff --git a/ballista/rust/executor/Cargo.toml
b/ballista/rust/executor/Cargo.toml
index cd031b20..f35b3543 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -34,14 +34,14 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
anyhow = "1"
-arrow = { version = "18.0.0" }
-arrow-flight = { version = "18.0.0" }
+arrow = { version = "20.0.0" }
+arrow-flight = { version = "20.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
@@ -50,7 +50,7 @@ snmalloc-rs = { version = "0.3", optional = true }
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"parking_lot", "signal"] }
tokio-stream = { version = "0.1", features = ["net"] }
-tonic = "0.7"
+tonic = "0.8"
tracing = "0.1.36"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15", features = ["fmt", "env-filter",
"ansi"] }
diff --git a/ballista/rust/scheduler/Cargo.toml
b/ballista/rust/scheduler/Cargo.toml
index 9f098038..8b4f6bd9 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -32,8 +32,10 @@ scheduler = "scheduler_config_spec.toml"
[features]
default = ["etcd", "sled"]
etcd = ["etcd-client"]
+flight-sql = []
sled = ["sled_package", "tokio-stream"]
+
[dependencies]
anyhow = "1"
arrow-flight = { version = "18.0.0", features = ["flight-sql-experimental"] }
@@ -42,8 +44,8 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
etcd-client = { version = "0.9", optional = true }
flatbuffers = { version = "2.1.2" }
futures = "0.3"
@@ -52,16 +54,16 @@ http-body = "0.4"
hyper = "0.14.4"
itertools = "0.10.3"
log = "0.4"
-object_store = "0.3.0"
+object_store = "0.4.0"
parking_lot = "0.12"
parse_arg = "0.1.3"
-prost = "0.10"
+prost = "0.11"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
sled_package = { package = "sled", version = "0.34", optional = true }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"], optional = true }
-tonic = "0.7"
+tonic = "0.8"
tower = { version = "0.4" }
tracing = "0.1.36"
tracing-appender = "0.2.2"
@@ -74,4 +76,4 @@ ballista-core = { path = "../core", version = "0.7.0" }
[build-dependencies]
configure_me_codegen = "0.4.1"
-tonic-build = { version = "0.7" }
+tonic-build = { version = "0.8", default-features = false, features =
["transport", "prost"] }
diff --git a/ballista/rust/scheduler/src/lib.rs
b/ballista/rust/scheduler/src/lib.rs
index 838eb562..d755bc68 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -25,6 +25,7 @@ pub mod scheduler_server;
pub mod standalone;
pub mod state;
+#[cfg(feature = "flight-sql")]
pub mod flight_sql;
#[cfg(test)]
pub mod test_utils;
diff --git a/ballista/rust/scheduler/src/main.rs
b/ballista/rust/scheduler/src/main.rs
index d393e4eb..50bfe85c 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -18,6 +18,7 @@
//! Ballista Rust scheduler binary.
use anyhow::{Context, Result};
+#[cfg(feature = "flight-sql")]
use arrow_flight::flight_service_server::FlightServiceServer;
use
ballista_scheduler::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
use futures::future::{self, Either, TryFutureExt};
@@ -60,6 +61,7 @@ mod config {
}
use ballista_core::utils::create_grpc_server;
+#[cfg(feature = "flight-sql")]
use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
use config::prelude::*;
use datafusion::execution::context::default_session_builder;
@@ -102,17 +104,19 @@ async fn start_server(
let scheduler_grpc_server =
SchedulerGrpcServer::new(scheduler_server.clone());
- let flight_sql_server =
FlightServiceServer::new(FlightSqlServiceImpl::new(
- scheduler_server.clone(),
- ));
-
let keda_scaler =
ExternalScalerServer::new(scheduler_server.clone());
- let mut tonic = create_grpc_server()
+ let tonic_builder = create_grpc_server()
.add_service(scheduler_grpc_server)
- .add_service(flight_sql_server)
- .add_service(keda_scaler)
- .into_service();
+ .add_service(keda_scaler);
+
+ #[cfg(feature = "flight-sql")]
+ tonic_builder.add_service(FlightServiceServer::new(
+ FlightSqlServiceImpl::new(scheduler_server.clone()),
+ ));
+
+ let mut tonic = tonic_builder.into_service();
+
let mut warp = warp::service(get_routes(scheduler_server.clone()));
let connect_info = request.connect_info();
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index d2797322..937a0df6 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index c20194ac..10528a28 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,9 +35,9 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736" }
futures = "0.3"
num_cpus = "1.13.0"
-prost = "0.10"
+prost = "0.11"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync", "parking_lot"] }
-tonic = "0.7"
+tonic = "0.8"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 01e3b5b6..fd5779ea 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
[dependencies]
async-trait = "0.1"
ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"6a5de4fe08597896ab6375e3e4b76c5744dcfba7", features = ["pyarrow"] }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"7aed4d697fa24053d515babfd7678855451c6736", features = ["pyarrow"] }
futures = "0.3"
mimalloc = { version = "*", optional = true, default-features = false }
pyo3 = { version = "~0.16.5", features = ["extension-module", "abi3",
"abi3-py37"] }
diff --git a/python/src/functions.rs b/python/src/functions.rs
index 44b294d6..3d86867d 100644
--- a/python/src/functions.rs
+++ b/python/src/functions.rs
@@ -46,7 +46,7 @@ fn in_list(expr: PyExpr, value: Vec<PyExpr>, negated: bool)
-> PyExpr {
fn now() -> PyExpr {
PyExpr {
// here lit(0) is a stub for conform to arity
- expr: logical_plan::now(logical_plan::lit(0)),
+ expr: logical_plan::now(),
}
}
diff --git a/python/src/udaf.rs b/python/src/udaf.rs
index 1fdf0d0d..ff26078e 100644
--- a/python/src/udaf.rs
+++ b/python/src/udaf.rs
@@ -25,7 +25,7 @@ use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::common::ScalarValue;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{
- Accumulator, AccumulatorFunctionImplementation, AggregateUDF,
+ Accumulator, AccumulatorFunctionImplementation, AggregateState,
AggregateUDF,
};
use datafusion::logical_plan;
@@ -44,9 +44,13 @@ impl RustAccumulator {
}
impl Accumulator for RustAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
- Python::with_gil(|py|
self.accum.as_ref(py).call_method0("state")?.extract())
- .map_err(|e| DataFusionError::Execution(format!("{}", e)))
+ fn state(&self) -> Result<Vec<AggregateState>> {
+ let py_result: PyResult<Vec<ScalarValue>> =
+ Python::with_gil(|py|
self.accum.as_ref(py).call_method0("state")?.extract());
+ match py_result {
+ Ok(r) => Ok(r.into_iter().map(AggregateState::Scalar).collect()),
+ Err(e) => Err(DataFusionError::Execution(format!("{}", e))),
+ }
}
fn evaluate(&self) -> Result<ScalarValue> {