andygrove commented on code in PR #347:
URL: https://github.com/apache/arrow-ballista/pull/347#discussion_r996337833
##########
ballista/rust/core/src/execution_plans/shuffle_reader.rs:
##########
@@ -232,35 +267,53 @@ impl Stream for AbortableReceiverStream {
}
}
-fn send_fetch_partitions<R: PartitionReader + 'static>(
+fn send_fetch_partitions(
partition_locations: Vec<PartitionLocation>,
max_request_num: usize,
- partition_reader: R,
) -> AbortableReceiverStream {
let (response_sender, response_receiver) = mpsc::channel(max_request_num);
let semaphore = Arc::new(Semaphore::new(max_request_num));
let mut join_handles = vec![];
for p in partition_locations.into_iter() {
- let semaphore = semaphore.clone();
- let response_sender = response_sender.clone();
- let partition_reader_clone = partition_reader.clone();
- let join_handle = tokio::spawn(async move {
- // Block if exceeds max request number
- let permit = semaphore.acquire_owned().await.unwrap();
- let r = partition_reader_clone.fetch_partition(&p).await;
- // Block if the channel buffer is full
- if let Err(e) = response_sender.send(r).await {
- error!("Fail to send response event to the channel due to {}",
e);
- }
- // Increase semaphore by dropping existing permits.
- drop(permit);
- });
- join_handles.push(join_handle);
+ if check_is_local_location(&p.executor_meta.host) {
+ // local shuffle reader should not be restrict
+ debug!("Get local partition file from {}", &p.executor_meta.host);
+ let response_sender = response_sender.clone();
+ let join_handle = tokio::spawn(async move {
+ let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
+ if let Err(e) = response_sender.send(r).await {
+ error!("Fail to send response event to the channel due to
{}", e);
+ }
+ });
+ join_handles.push(join_handle);
+ } else {
+ debug!("Get remote partition file from {}", &p.executor_meta.host);
+ let semaphore = semaphore.clone();
+ let response_sender = response_sender.clone();
+ let join_handle = tokio::spawn(async move {
+ // Block if exceeds max request number
+ let permit = semaphore.acquire_owned().await.unwrap();
+ let r =
PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
+ // Block if the channel buffer is ful
+ if let Err(e) = response_sender.send(r).await {
+ error!("Fail to send response event to the channel due to
{}", e);
+ }
+ // Increase semaphore by dropping existing permits.
+ drop(permit);
+ });
+ join_handles.push(join_handle);
+ }
}
AbortableReceiverStream::create(response_receiver, join_handles)
}
+fn check_is_local_location(host: &str) -> bool {
Review Comment:
Filed https://github.com/apache/arrow-ballista/issues/356 so we can discuss
this as a follow on
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]