This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 0a518ae6 Scheduler now verifies that `file://` ListingTable URLs are
accessible (#414)
0a518ae6 is described below
commit 0a518ae6685d1b189fcc8c38a3bab78d3d2844b0
Author: Andy Grove <[email protected]>
AuthorDate: Sun Oct 23 11:05:50 2022 -0600
Scheduler now verifies that `file://` ListingTable URLs are accessible
(#414)
---
ballista/scheduler/src/planner.rs | 1 +
ballista/scheduler/src/scheduler_server/grpc.rs | 4 +-
ballista/scheduler/src/state/mod.rs | 67 +++++++++++++++++++++++--
docker-compose.yml | 2 +-
4 files changed, 68 insertions(+), 6 deletions(-)
diff --git a/ballista/scheduler/src/planner.rs
b/ballista/scheduler/src/planner.rs
index 474fd63d..8d6ec736 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -229,6 +229,7 @@ pub fn remove_unresolved_shuffles(
.iter()
.map(|c| c
.iter()
+ .filter(|l| !l.path.is_empty())
.map(|l| l.path.clone())
.collect::<Vec<_>>()
.join(", "))
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 14835c90..7b5c96bd 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -38,7 +38,7 @@ use
datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion_proto::logical_plan::AsLogicalPlan;
use futures::TryStreamExt;
-use log::{debug, error, info, warn};
+use log::{debug, error, info, trace, warn};
use std::ops::Deref;
use std::sync::Arc;
@@ -71,7 +71,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
task_status,
} = request.into_inner()
{
- debug!("Received poll_work request for {:?}", metadata);
+ trace!("Received poll_work request for {:?}", metadata);
// We might receive buggy poll work requests from dead executors.
if self
.state
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 6943cbd5..cb723c9a 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
+use datafusion::datasource::source_as_provider;
+use datafusion::logical_expr::PlanVisitor;
use std::any::type_name;
use std::collections::HashMap;
use std::future::Future;
@@ -34,6 +37,7 @@ use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::logical_plan::LogicalPlan;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{debug, error, info};
@@ -304,11 +308,68 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
plan: &LogicalPlan,
) -> Result<()> {
let start = Instant::now();
- let optimized_plan = session_ctx.optimize(plan)?;
- debug!("Calculated optimized plan: {:?}", optimized_plan);
+ if log::max_level() >= log::Level::Debug {
+ // optimizing the plan here is redundant because the physical
planner will do this again
+ // but it is helpful to see what the optimized plan will be
+ let optimized_plan = session_ctx.optimize(plan)?;
+ debug!("Optimized plan: {}", optimized_plan.display_indent());
+ }
+
+ struct VerifyPathsExist {}
+ impl PlanVisitor for VerifyPathsExist {
+ type Error = BallistaError;
+
+ fn pre_visit(
+ &mut self,
+ plan: &LogicalPlan,
+ ) -> std::result::Result<bool, Self::Error> {
+ if let LogicalPlan::TableScan(scan) = plan {
+ let provider = source_as_provider(&scan.source)?;
+ if let Some(table) =
provider.as_any().downcast_ref::<ListingTable>()
+ {
+ let local_paths: Vec<&ListingTableUrl> = table
+ .table_paths()
+ .iter()
+ .filter(|url| url.as_str().starts_with("file:///"))
+ .collect();
+ if !local_paths.is_empty() {
+ // These are local files rather than remote object
stores, so we
+ // need to check that they are accessible on the
scheduler (the client
+ // may not be on the same host, or the data path
may not be correctly
+ // mounted in the container). There could be
thousands of files so we
+ // just check the first one.
+ let url = &local_paths[0].as_str();
+ // the unwraps are safe here because we checked
that the url starts with file:///
+ // we need to check both versions here to support
Linux & Windows
+
ListingTableUrl::parse(url.strip_prefix("file://").unwrap())
+ .or_else(|_| {
+ ListingTableUrl::parse(
+ url.strip_prefix("file:///").unwrap(),
+ )
+ })
+ .map_err(|e| {
+ BallistaError::General(format!(
+ "logical plan refers to path on local file
system \
+ that is not accessible in the scheduler:
{}: {:?}",
+ url, e
+ ))
+ })?;
+ }
+ }
+ }
+ Ok(true)
+ }
+ }
+
+ let mut verify_paths_exist = VerifyPathsExist {};
+ plan.accept(&mut verify_paths_exist)?;
- let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
+ let plan = session_ctx.create_physical_plan(plan).await?;
+ debug!(
+ "Physical plan: {}",
+ DisplayableExecutionPlan::new(plan.as_ref()).indent()
+ );
self.task_manager
.submit_job(job_id, job_name, &session_ctx.session_id(), plan)
diff --git a/docker-compose.yml b/docker-compose.yml
index f2459236..89b4fdb1 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -31,7 +31,7 @@ services:
- "80:80"
- "50050:50050"
environment:
- - RUST_LOG=info
+ - RUST_LOG=ballista=debug,info
volumes:
- ./benchmarks/data:/data
depends_on: