This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push: new 8f4078d ShuffleReaderExec now supports multiple locations per partition (#541) 8f4078d is described below commit 8f4078d83f7ea0348fa43906d26156bf8a95de4c Author: Andy Grove <andygrov...@gmail.com> AuthorDate: Sat Jun 12 06:45:06 2021 -0600 ShuffleReaderExec now supports multiple locations per partition (#541) * ShuffleReaderExec now supports multiple locations per partition * Remove TODO * avoid clone --- ballista/rust/client/src/context.rs | 39 ++------- ballista/rust/core/proto/ballista.proto | 7 +- .../core/src/execution_plans/shuffle_reader.rs | 94 +++++++++++++--------- .../core/src/serde/physical_plan/from_proto.rs | 12 ++- .../rust/core/src/serde/physical_plan/to_proto.rs | 18 +++-- ballista/rust/core/src/utils.rs | 40 ++++++++- ballista/rust/scheduler/src/planner.rs | 2 +- ballista/rust/scheduler/src/state/mod.rs | 6 +- 8 files changed, 130 insertions(+), 88 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 4e5cc1a..695045d 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -29,21 +29,18 @@ use ballista_core::serde::protobuf::{ execute_query_params::Query, job_status, ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, }; +use ballista_core::utils::WrappedStream; use ballista_core::{ client::BallistaClient, datasource::DfTableAdapter, utils::create_datafusion_context, }; use datafusion::arrow::datatypes::Schema; -use datafusion::arrow::datatypes::SchemaRef; -use datafusion::arrow::error::Result as ArrowResult; -use datafusion::arrow::record_batch::RecordBatch; use datafusion::catalog::TableReference; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::csv::CsvReadOptions; use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream}; use futures::future; -use futures::Stream; use futures::StreamExt; use log::{error, info}; @@ -74,32 +71,6 @@ impl BallistaContextState { } } -struct WrappedStream { - stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>, - schema: SchemaRef, -} - -impl RecordBatchStream for WrappedStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -impl Stream for WrappedStream { - type Item = ArrowResult<RecordBatch>; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Option<Self::Item>> { - self.stream.poll_next_unpin(cx) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() - } -} - #[allow(dead_code)] pub struct BallistaContext { @@ -287,10 +258,10 @@ impl BallistaContext { .into_iter() .collect::<Result<Vec<_>>>()?; - let result = WrappedStream { - stream: Box::pin(futures::stream::iter(result).flatten()), - schema: Arc::new(schema), - }; + let result = WrappedStream::new( + Box::pin(futures::stream::iter(result).flatten()), + Arc::new(schema), + ); break Ok(Box::pin(result)); } }; diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 85af902..5aafd00 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -489,10 +489,15 @@ message HashAggregateExecNode { } message ShuffleReaderExecNode { - repeated PartitionLocation partition_location = 1; + repeated ShuffleReaderPartition partition = 1; Schema schema = 2; } +message ShuffleReaderPartition { + // each partition of a shuffle read can read data from multiple locations + repeated PartitionLocation location = 1; +} + message GlobalLimitExecNode { PhysicalPlanNode input = 1; uint32 limit = 2; diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index db29cf1..3a7f795 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::Formatter; use std::sync::Arc; use std::{any::Any, pin::Pin}; @@ -22,35 +23,35 @@ use crate::client::BallistaClient; use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionLocation; +use crate::utils::WrappedStream; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::Result as ArrowResult; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; use datafusion::{ error::{DataFusionError, Result}, physical_plan::RecordBatchStream, }; +use futures::{future, Stream, StreamExt}; use log::info; -use std::fmt::Formatter; -/// ShuffleReaderExec reads partitions that have already been materialized by an executor. +/// ShuffleReaderExec reads partitions that have already been materialized by a query stage +/// being executed by an executor #[derive(Debug, Clone)] pub struct ShuffleReaderExec { - // The query stage that is responsible for producing the shuffle partitions that - // this operator will read - pub(crate) partition_location: Vec<PartitionLocation>, + /// Each partition of a shuffle can read data from multiple locations + pub(crate) partition: Vec<Vec<PartitionLocation>>, pub(crate) schema: SchemaRef, } impl ShuffleReaderExec { /// Create a new ShuffleReaderExec pub fn try_new( - partition_meta: Vec<PartitionLocation>, + partition: Vec<Vec<PartitionLocation>>, schema: SchemaRef, ) -> Result<Self> { - Ok(Self { - partition_location: partition_meta, - schema, - }) + Ok(Self { partition, schema }) } } @@ -65,7 +66,7 @@ impl ExecutionPlan for ShuffleReaderExec { } fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.partition_location.len()) + Partitioning::UnknownPartitioning(self.partition.len()) } fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { @@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec { partition: usize, ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> { info!("ShuffleReaderExec::execute({})", partition); - let partition_location = &self.partition_location[partition]; - - let mut client = BallistaClient::try_new( - &partition_location.executor_meta.host, - partition_location.executor_meta.port, - ) - .await - .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?; - client - .fetch_partition( - &partition_location.partition_id.job_id, - partition_location.partition_id.stage_id, - partition, - ) + let partition_locations = &self.partition[partition]; + let result = future::join_all(partition_locations.iter().map(fetch_partition)) .await - .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e))) + .into_iter() + .collect::<Result<Vec<_>>>()?; + + let result = WrappedStream::new( + Box::pin(futures::stream::iter(result).flatten()), + Arc::new(self.schema.as_ref().clone()), + ); + Ok(Box::pin(result)) } fn fmt_as( @@ -113,22 +109,46 @@ impl ExecutionPlan for ShuffleReaderExec { match t { DisplayFormatType::Default => { let loc_str = self - .partition_location + .partition .iter() - .map(|l| { - format!( - "[executor={} part={}:{}:{} stats={:?}]", - l.executor_meta.id, - l.partition_id.job_id, - l.partition_id.stage_id, - l.partition_id.partition_id, - l.partition_stats - ) + .map(|x| { + x.iter() + .map(|l| { + format!( + "[executor={} part={}:{}:{} stats={:?}]", + l.executor_meta.id, + l.partition_id.job_id, + l.partition_id.stage_id, + l.partition_id.partition_id, + l.partition_stats + ) + }) + .collect::<Vec<String>>() + .join(",") }) .collect::<Vec<String>>() - .join(","); + .join("\n"); write!(f, "ShuffleReaderExec: partition_locations={}", loc_str) } } } } + +async fn fetch_partition( + location: &PartitionLocation, +) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> { + let metadata = &location.executor_meta; + let partition_id = &location.partition_id; + let mut ballista_client = + BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + Ok(ballista_client + .fetch_partition( + &partition_id.job_id, + partition_id.stage_id as usize, + partition_id.partition_id as usize, + ) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?) +} diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index d49d53c..a2c9db9 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -25,6 +25,7 @@ use crate::error::BallistaError; use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec}; use crate::serde::protobuf::repartition_exec_node::PartitionMethod; use crate::serde::protobuf::LogicalExprNode; +use crate::serde::protobuf::ShuffleReaderPartition; use crate::serde::scheduler::PartitionLocation; use crate::serde::{proto_error, protobuf}; use crate::{convert_box_required, convert_required}; @@ -327,10 +328,15 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode { } PhysicalPlanType::ShuffleReader(shuffle_reader) => { let schema = Arc::new(convert_required!(shuffle_reader.schema)?); - let partition_location: Vec<PartitionLocation> = shuffle_reader - .partition_location + let partition_location: Vec<Vec<PartitionLocation>> = shuffle_reader + .partition .iter() - .map(|p| p.clone().try_into()) + .map(|p| { + p.location + .iter() + .map(|l| l.clone().try_into()) + .collect::<Result<Vec<_>, _>>() + }) .collect::<Result<Vec<_>, BallistaError>>()?; let shuffle_reader = ShuffleReaderExec::try_new(partition_location, schema)?; diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 26092e7..15d5d4b 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -57,6 +57,7 @@ use protobuf::physical_plan_node::PhysicalPlanType; use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec}; use crate::serde::protobuf::repartition_exec_node::PartitionMethod; +use crate::serde::scheduler::PartitionLocation; use crate::serde::{protobuf, BallistaError}; use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr}; use datafusion::physical_plan::merge::MergeExec; @@ -268,16 +269,19 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> { )), }) } else if let Some(exec) = plan.downcast_ref::<ShuffleReaderExec>() { - let partition_location = exec - .partition_location - .iter() - .map(|l| l.clone().try_into()) - .collect::<Result<_, _>>()?; - + let mut partition = vec![]; + for location in &exec.partition { + partition.push(protobuf::ShuffleReaderPartition { + location: location + .iter() + .map(|l| l.clone().try_into()) + .collect::<Result<Vec<_>, _>>()?, + }); + } Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ShuffleReader( protobuf::ShuffleReaderExecNode { - partition_location, + partition, schema: Some(exec.schema().as_ref().into()), }, )), diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 4ba6ec4..b58be28 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -27,11 +27,12 @@ use crate::execution_plans::{QueryStageExec, UnresolvedShuffleExec}; use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionStats; +use datafusion::arrow::error::Result as ArrowResult; use datafusion::arrow::{ array::{ ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder, }, - datatypes::{DataType, Field}, + datatypes::{DataType, Field, SchemaRef}, ipc::reader::FileReader, ipc::writer::FileWriter, record_batch::RecordBatch, @@ -54,7 +55,7 @@ use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::{ AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream, }; -use futures::StreamExt; +use futures::{future, Stream, StreamExt}; /// Stream data to disk in Arrow IPC format @@ -234,3 +235,38 @@ pub fn create_datafusion_context() -> ExecutionContext { .with_physical_optimizer_rules(rules); ExecutionContext::with_config(config) } + +pub struct WrappedStream { + stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>, + schema: SchemaRef, +} + +impl WrappedStream { + pub fn new( + stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>, + schema: SchemaRef, + ) -> Self { + Self { stream, schema } + } +} + +impl RecordBatchStream for WrappedStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for WrappedStream { + type Item = ArrowResult<RecordBatch>; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<Self::Item>> { + self.stream.poll_next_unpin(cx) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 445ef9a..2ac9f61 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -186,7 +186,7 @@ impl DistributedPlanner { pub fn remove_unresolved_shuffles( stage: &dyn ExecutionPlan, - partition_locations: &HashMap<usize, Vec<PartitionLocation>>, + partition_locations: &HashMap<usize, Vec<Vec<PartitionLocation>>>, ) -> Result<Arc<dyn ExecutionPlan>> { let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![]; for child in stage.children() { diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index a15efd6..506fd1c 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -234,7 +234,7 @@ impl SchedulerState { let unresolved_shuffles = find_unresolved_shuffles(&plan)?; let mut partition_locations: HashMap< usize, - Vec<ballista_core::serde::scheduler::PartitionLocation>, + Vec<Vec<ballista_core::serde::scheduler::PartitionLocation>>, > = HashMap::new(); for unresolved_shuffle in unresolved_shuffles { for stage_id in unresolved_shuffle.query_stage_ids { @@ -256,7 +256,7 @@ impl SchedulerState { let empty = vec![]; let locations = partition_locations.entry(stage_id).or_insert(empty); - locations.push( + locations.push(vec![ ballista_core::serde::scheduler::PartitionLocation { partition_id: ballista_core::serde::scheduler::PartitionId { @@ -271,7 +271,7 @@ impl SchedulerState { .clone(), partition_stats: PartitionStats::default(), }, - ); + ]); } else { continue 'tasks; }