This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f4078d  ShuffleReaderExec now supports multiple locations per 
partition (#541)
8f4078d is described below

commit 8f4078d83f7ea0348fa43906d26156bf8a95de4c
Author: Andy Grove <andygrov...@gmail.com>
AuthorDate: Sat Jun 12 06:45:06 2021 -0600

    ShuffleReaderExec now supports multiple locations per partition (#541)
    
    * ShuffleReaderExec now supports multiple locations per partition
    
    * Remove TODO
    
    * avoid clone
---
 ballista/rust/client/src/context.rs                | 39 ++-------
 ballista/rust/core/proto/ballista.proto            |  7 +-
 .../core/src/execution_plans/shuffle_reader.rs     | 94 +++++++++++++---------
 .../core/src/serde/physical_plan/from_proto.rs     | 12 ++-
 .../rust/core/src/serde/physical_plan/to_proto.rs  | 18 +++--
 ballista/rust/core/src/utils.rs                    | 40 ++++++++-
 ballista/rust/scheduler/src/planner.rs             |  2 +-
 ballista/rust/scheduler/src/state/mod.rs           |  6 +-
 8 files changed, 130 insertions(+), 88 deletions(-)

diff --git a/ballista/rust/client/src/context.rs 
b/ballista/rust/client/src/context.rs
index 4e5cc1a..695045d 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -29,21 +29,18 @@ use ballista_core::serde::protobuf::{
     execute_query_params::Query, job_status, ExecuteQueryParams, 
GetJobStatusParams,
     GetJobStatusResult,
 };
+use ballista_core::utils::WrappedStream;
 use ballista_core::{
     client::BallistaClient, datasource::DfTableAdapter, 
utils::create_datafusion_context,
 };
 
 use datafusion::arrow::datatypes::Schema;
-use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::arrow::error::Result as ArrowResult;
-use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::catalog::TableReference;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::physical_plan::csv::CsvReadOptions;
 use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream};
 use futures::future;
-use futures::Stream;
 use futures::StreamExt;
 use log::{error, info};
 
@@ -74,32 +71,6 @@ impl BallistaContextState {
     }
 }
 
-struct WrappedStream {
-    stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + 
Sync>>,
-    schema: SchemaRef,
-}
-
-impl RecordBatchStream for WrappedStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-impl Stream for WrappedStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
-        self.stream.poll_next_unpin(cx)
-    }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        self.stream.size_hint()
-    }
-}
-
 #[allow(dead_code)]
 
 pub struct BallistaContext {
@@ -287,10 +258,10 @@ impl BallistaContext {
                     .into_iter()
                     .collect::<Result<Vec<_>>>()?;
 
-                    let result = WrappedStream {
-                        stream: 
Box::pin(futures::stream::iter(result).flatten()),
-                        schema: Arc::new(schema),
-                    };
+                    let result = WrappedStream::new(
+                        Box::pin(futures::stream::iter(result).flatten()),
+                        Arc::new(schema),
+                    );
                     break Ok(Box::pin(result));
                 }
             };
diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 85af902..5aafd00 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -489,10 +489,15 @@ message HashAggregateExecNode {
 }
 
 message ShuffleReaderExecNode {
-  repeated PartitionLocation partition_location = 1;
+  repeated ShuffleReaderPartition partition = 1;
   Schema schema = 2;
 }
 
+message ShuffleReaderPartition {
+  // each partition of a shuffle read can read data from multiple locations
+  repeated PartitionLocation location = 1;
+}
+
 message GlobalLimitExecNode {
   PhysicalPlanNode input = 1;
   uint32 limit = 2;
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs 
b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index db29cf1..3a7f795 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fmt::Formatter;
 use std::sync::Arc;
 use std::{any::Any, pin::Pin};
 
@@ -22,35 +23,35 @@ use crate::client::BallistaClient;
 use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::PartitionLocation;
 
+use crate::utils::WrappedStream;
 use async_trait::async_trait;
 use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, 
Partitioning};
 use datafusion::{
     error::{DataFusionError, Result},
     physical_plan::RecordBatchStream,
 };
+use futures::{future, Stream, StreamExt};
 use log::info;
-use std::fmt::Formatter;
 
-/// ShuffleReaderExec reads partitions that have already been materialized by 
an executor.
+/// ShuffleReaderExec reads partitions that have already been materialized by 
a query stage
+/// being executed by an executor
 #[derive(Debug, Clone)]
 pub struct ShuffleReaderExec {
-    // The query stage that is responsible for producing the shuffle 
partitions that
-    // this operator will read
-    pub(crate) partition_location: Vec<PartitionLocation>,
+    /// Each partition of a shuffle can read data from multiple locations
+    pub(crate) partition: Vec<Vec<PartitionLocation>>,
     pub(crate) schema: SchemaRef,
 }
 
 impl ShuffleReaderExec {
     /// Create a new ShuffleReaderExec
     pub fn try_new(
-        partition_meta: Vec<PartitionLocation>,
+        partition: Vec<Vec<PartitionLocation>>,
         schema: SchemaRef,
     ) -> Result<Self> {
-        Ok(Self {
-            partition_location: partition_meta,
-            schema,
-        })
+        Ok(Self { partition, schema })
     }
 }
 
@@ -65,7 +66,7 @@ impl ExecutionPlan for ShuffleReaderExec {
     }
 
     fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(self.partition_location.len())
+        Partitioning::UnknownPartitioning(self.partition.len())
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: 
{:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let partition_locations = &self.partition[partition];
+        let result = 
future::join_all(partition_locations.iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: 
{:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),
+            Arc::new(self.schema.as_ref().clone()),
+        );
+        Ok(Box::pin(result))
     }
 
     fn fmt_as(
@@ -113,22 +109,46 @@ impl ExecutionPlan for ShuffleReaderExec {
         match t {
             DisplayFormatType::Default => {
                 let loc_str = self
-                    .partition_location
+                    .partition
                     .iter()
-                    .map(|l| {
-                        format!(
-                            "[executor={} part={}:{}:{} stats={:?}]",
-                            l.executor_meta.id,
-                            l.partition_id.job_id,
-                            l.partition_id.stage_id,
-                            l.partition_id.partition_id,
-                            l.partition_stats
-                        )
+                    .map(|x| {
+                        x.iter()
+                            .map(|l| {
+                                format!(
+                                    "[executor={} part={}:{}:{} stats={:?}]",
+                                    l.executor_meta.id,
+                                    l.partition_id.job_id,
+                                    l.partition_id.stage_id,
+                                    l.partition_id.partition_id,
+                                    l.partition_stats
+                                )
+                            })
+                            .collect::<Vec<String>>()
+                            .join(",")
                     })
                     .collect::<Vec<String>>()
-                    .join(",");
+                    .join("\n");
                 write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
             }
         }
     }
 }
+
+async fn fetch_partition(
+    location: &PartitionLocation,
+) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
+    let metadata = &location.executor_meta;
+    let partition_id = &location.partition_id;
+    let mut ballista_client =
+        BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16)
+            .await
+            .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+    Ok(ballista_client
+        .fetch_partition(
+            &partition_id.job_id,
+            partition_id.stage_id as usize,
+            partition_id.partition_id as usize,
+        )
+        .await
+        .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
+}
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index d49d53c..a2c9db9 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -25,6 +25,7 @@ use crate::error::BallistaError;
 use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
 use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
 use crate::serde::protobuf::LogicalExprNode;
+use crate::serde::protobuf::ShuffleReaderPartition;
 use crate::serde::scheduler::PartitionLocation;
 use crate::serde::{proto_error, protobuf};
 use crate::{convert_box_required, convert_required};
@@ -327,10 +328,15 @@ impl TryInto<Arc<dyn ExecutionPlan>> for 
&protobuf::PhysicalPlanNode {
             }
             PhysicalPlanType::ShuffleReader(shuffle_reader) => {
                 let schema = 
Arc::new(convert_required!(shuffle_reader.schema)?);
-                let partition_location: Vec<PartitionLocation> = shuffle_reader
-                    .partition_location
+                let partition_location: Vec<Vec<PartitionLocation>> = 
shuffle_reader
+                    .partition
                     .iter()
-                    .map(|p| p.clone().try_into())
+                    .map(|p| {
+                        p.location
+                            .iter()
+                            .map(|l| l.clone().try_into())
+                            .collect::<Result<Vec<_>, _>>()
+                    })
                     .collect::<Result<Vec<_>, BallistaError>>()?;
                 let shuffle_reader =
                     ShuffleReaderExec::try_new(partition_location, schema)?;
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 26092e7..15d5d4b 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -57,6 +57,7 @@ use protobuf::physical_plan_node::PhysicalPlanType;
 
 use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
 use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
+use crate::serde::scheduler::PartitionLocation;
 use crate::serde::{protobuf, BallistaError};
 use datafusion::physical_plan::functions::{BuiltinScalarFunction, 
ScalarFunctionExpr};
 use datafusion::physical_plan::merge::MergeExec;
@@ -268,16 +269,19 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn 
ExecutionPlan> {
                 )),
             })
         } else if let Some(exec) = plan.downcast_ref::<ShuffleReaderExec>() {
-            let partition_location = exec
-                .partition_location
-                .iter()
-                .map(|l| l.clone().try_into())
-                .collect::<Result<_, _>>()?;
-
+            let mut partition = vec![];
+            for location in &exec.partition {
+                partition.push(protobuf::ShuffleReaderPartition {
+                    location: location
+                        .iter()
+                        .map(|l| l.clone().try_into())
+                        .collect::<Result<Vec<_>, _>>()?,
+                });
+            }
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::ShuffleReader(
                     protobuf::ShuffleReaderExecNode {
-                        partition_location,
+                        partition,
                         schema: Some(exec.schema().as_ref().into()),
                     },
                 )),
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 4ba6ec4..b58be28 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -27,11 +27,12 @@ use crate::execution_plans::{QueryStageExec, 
UnresolvedShuffleExec};
 use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::PartitionStats;
 
+use datafusion::arrow::error::Result as ArrowResult;
 use datafusion::arrow::{
     array::{
         ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, 
UInt64Builder,
     },
-    datatypes::{DataType, Field},
+    datatypes::{DataType, Field, SchemaRef},
     ipc::reader::FileReader,
     ipc::writer::FileWriter,
     record_batch::RecordBatch,
@@ -54,7 +55,7 @@ use datafusion::physical_plan::sort::SortExec;
 use datafusion::physical_plan::{
     AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream,
 };
-use futures::StreamExt;
+use futures::{future, Stream, StreamExt};
 
 /// Stream data to disk in Arrow IPC format
 
@@ -234,3 +235,38 @@ pub fn create_datafusion_context() -> ExecutionContext {
         .with_physical_optimizer_rules(rules);
     ExecutionContext::with_config(config)
 }
+
+pub struct WrappedStream {
+    stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + 
Sync>>,
+    schema: SchemaRef,
+}
+
+impl WrappedStream {
+    pub fn new(
+        stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + 
Sync>>,
+        schema: SchemaRef,
+    ) -> Self {
+        Self { stream, schema }
+    }
+}
+
+impl RecordBatchStream for WrappedStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+impl Stream for WrappedStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        self.stream.poll_next_unpin(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.stream.size_hint()
+    }
+}
diff --git a/ballista/rust/scheduler/src/planner.rs 
b/ballista/rust/scheduler/src/planner.rs
index 445ef9a..2ac9f61 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -186,7 +186,7 @@ impl DistributedPlanner {
 
 pub fn remove_unresolved_shuffles(
     stage: &dyn ExecutionPlan,
-    partition_locations: &HashMap<usize, Vec<PartitionLocation>>,
+    partition_locations: &HashMap<usize, Vec<Vec<PartitionLocation>>>,
 ) -> Result<Arc<dyn ExecutionPlan>> {
     let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
     for child in stage.children() {
diff --git a/ballista/rust/scheduler/src/state/mod.rs 
b/ballista/rust/scheduler/src/state/mod.rs
index a15efd6..506fd1c 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -234,7 +234,7 @@ impl SchedulerState {
                 let unresolved_shuffles = find_unresolved_shuffles(&plan)?;
                 let mut partition_locations: HashMap<
                     usize,
-                    Vec<ballista_core::serde::scheduler::PartitionLocation>,
+                    
Vec<Vec<ballista_core::serde::scheduler::PartitionLocation>>,
                 > = HashMap::new();
                 for unresolved_shuffle in unresolved_shuffles {
                     for stage_id in unresolved_shuffle.query_stage_ids {
@@ -256,7 +256,7 @@ impl SchedulerState {
                                 let empty = vec![];
                                 let locations =
                                     
partition_locations.entry(stage_id).or_insert(empty);
-                                locations.push(
+                                locations.push(vec![
                                     
ballista_core::serde::scheduler::PartitionLocation {
                                         partition_id:
                                             
ballista_core::serde::scheduler::PartitionId {
@@ -271,7 +271,7 @@ impl SchedulerState {
                                             .clone(),
                                         partition_stats: 
PartitionStats::default(),
                                     },
-                                );
+                                ]);
                             } else {
                                 continue 'tasks;
                             }

Reply via email to