This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a518ae6 Scheduler now verifies that `file://` ListingTable URLs are 
accessible (#414)
0a518ae6 is described below

commit 0a518ae6685d1b189fcc8c38a3bab78d3d2844b0
Author: Andy Grove <[email protected]>
AuthorDate: Sun Oct 23 11:05:50 2022 -0600

    Scheduler now verifies that `file://` ListingTable URLs are accessible 
(#414)
---
 ballista/scheduler/src/planner.rs               |  1 +
 ballista/scheduler/src/scheduler_server/grpc.rs |  4 +-
 ballista/scheduler/src/state/mod.rs             | 67 +++++++++++++++++++++++--
 docker-compose.yml                              |  2 +-
 4 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/ballista/scheduler/src/planner.rs 
b/ballista/scheduler/src/planner.rs
index 474fd63d..8d6ec736 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -229,6 +229,7 @@ pub fn remove_unresolved_shuffles(
                     .iter()
                     .map(|c| c
                         .iter()
+                        .filter(|l| !l.path.is_empty())
                         .map(|l| l.path.clone())
                         .collect::<Vec<_>>()
                         .join(", "))
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 14835c90..7b5c96bd 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -38,7 +38,7 @@ use 
datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use futures::TryStreamExt;
-use log::{debug, error, info, warn};
+use log::{debug, error, info, trace, warn};
 
 use std::ops::Deref;
 use std::sync::Arc;
@@ -71,7 +71,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
             task_status,
         } = request.into_inner()
         {
-            debug!("Received poll_work request for {:?}", metadata);
+            trace!("Received poll_work request for {:?}", metadata);
             // We might receive buggy poll work requests from dead executors.
             if self
                 .state
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 6943cbd5..cb723c9a 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
+use datafusion::datasource::source_as_provider;
+use datafusion::logical_expr::PlanVisitor;
 use std::any::type_name;
 use std::collections::HashMap;
 use std::future::Future;
@@ -34,6 +37,7 @@ use ballista_core::error::{BallistaError, Result};
 use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use datafusion::logical_plan::LogicalPlan;
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use log::{debug, error, info};
@@ -304,11 +308,68 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         plan: &LogicalPlan,
     ) -> Result<()> {
         let start = Instant::now();
-        let optimized_plan = session_ctx.optimize(plan)?;
 
-        debug!("Calculated optimized plan: {:?}", optimized_plan);
+        if log::max_level() >= log::Level::Debug {
+            // optimizing the plan here is redundant because the physical 
planner will do this again
+            // but it is helpful to see what the optimized plan will be
+            let optimized_plan = session_ctx.optimize(plan)?;
+            debug!("Optimized plan: {}", optimized_plan.display_indent());
+        }
+
+        struct VerifyPathsExist {}
+        impl PlanVisitor for VerifyPathsExist {
+            type Error = BallistaError;
+
+            fn pre_visit(
+                &mut self,
+                plan: &LogicalPlan,
+            ) -> std::result::Result<bool, Self::Error> {
+                if let LogicalPlan::TableScan(scan) = plan {
+                    let provider = source_as_provider(&scan.source)?;
+                    if let Some(table) = 
provider.as_any().downcast_ref::<ListingTable>()
+                    {
+                        let local_paths: Vec<&ListingTableUrl> = table
+                            .table_paths()
+                            .iter()
+                            .filter(|url| url.as_str().starts_with("file:///"))
+                            .collect();
+                        if !local_paths.is_empty() {
+                            // These are local files rather than remote object 
stores, so we
+                            // need to check that they are accessible on the 
scheduler (the client
+                            // may not be on the same host, or the data path 
may not be correctly
+                            // mounted in the container). There could be 
thousands of files so we
+                            // just check the first one.
+                            let url = &local_paths[0].as_str();
+                            // the unwraps are safe here because we checked 
that the url starts with file:///
+                            // we need to check both versions here to support 
Linux & Windows
+                            
ListingTableUrl::parse(url.strip_prefix("file://").unwrap())
+                                .or_else(|_| {
+                                    ListingTableUrl::parse(
+                                        url.strip_prefix("file:///").unwrap(),
+                                    )
+                                })
+                                .map_err(|e| {
+                                    BallistaError::General(format!(
+                                    "logical plan refers to path on local file 
system \
+                                    that is not accessible in the scheduler: 
{}: {:?}",
+                                    url, e
+                                ))
+                                })?;
+                        }
+                    }
+                }
+                Ok(true)
+            }
+        }
+
+        let mut verify_paths_exist = VerifyPathsExist {};
+        plan.accept(&mut verify_paths_exist)?;
 
-        let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
+        let plan = session_ctx.create_physical_plan(plan).await?;
+        debug!(
+            "Physical plan: {}",
+            DisplayableExecutionPlan::new(plan.as_ref()).indent()
+        );
 
         self.task_manager
             .submit_job(job_id, job_name, &session_ctx.session_id(), plan)
diff --git a/docker-compose.yml b/docker-compose.yml
index f2459236..89b4fdb1 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -31,7 +31,7 @@ services:
       - "80:80"
       - "50050:50050"
     environment:
-      - RUST_LOG=info
+      - RUST_LOG=ballista=debug,info
     volumes:
       - ./benchmarks/data:/data
     depends_on:

Reply via email to