[ 
https://issues.apache.org/jira/browse/ARROW-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201435#comment-17201435
 ] 

Jorge edited comment on ARROW-9707 at 9/24/20, 10:33 AM:
---------------------------------------------------------

I thought a bit about this, and I have an hypothesis: the core pattern in 
DataFusion today is:
 # {{ExecutionPlan}} is an iterator of {{RecordBatchReader}} via the function 
{{build}}
 # {{RecordBatchReaders}} is an iterator of {{RecordBatch}} via {{next_batch}}
 # {{ExecutionPlan}} 's {{size_hint}} is given by {{output_partitioning}}
 # {{RecordBatchReaders}}'s {{size_hint}}  is unknown, as it typically comes 
from scaning through a file

If this hypothesis holds, IMO we could convert {{ExecutionPlan}} to allow an 
{{IntoIter<Item=IntoIter<Item=RecordBatch>>}}

which would allow to easily flatten the iterator over a set of threads. Threads 
switch context during I/O, e.g. when a thread finishes reading a block of 
batches in a CSV, it can switch to execute whatever it is happening in another 
place.

This will also give more visibility to a scheduler, as it has all the necessary 
information it needs to schedule tasks.

[this answer in SO|https://stackoverflow.com/a/53176418/931303] uses IntoIter 
of IntoIter to implement a generic (single-threaded) {{merge_sort}}. This would 
also allow other architectures, as they would offload whole partitions to 
different processes. In our context, something like this (all static for now


{code:java}
fn merge_sorted<IterT: 'static + Send, IterIterT, T: 'static + Ord + Clone + 
fmt::Debug + Send>(arrays: IterIterT) -> Vec<T>
where
    IterT: IntoIterator<Item = T>,
    IterIterT: IntoIterator<Item = IterT>,
{
    let all_values = Arc::new(Mutex::new(vec![]));
    let threads: Vec<JoinHandle<()>> = arrays.into_iter().map(|array| {
        let mutex_clone = Arc::clone(&all_values);
        thread::spawn(move || {
            let mut values: Vec<T> = array.into_iter().collect();
            values.sort(); // this is wrong, but all information is available 
to do it
            mutex_clone.lock().unwrap().extend_from_slice(&values);
        })
    }).collect();
    for thread in threads {
        thread.join().unwrap()
    }
    let result = all_values.lock().unwrap().clone();
    result
}
{code}


was (Author: jorgecarleitao):
I thought a bit about this, and I have an hypothesis: the core pattern in 
DataFusion today is:
 # {{ExecutionPlan}} is an iterator of {{RecordBatchReader}} via the function 
{{build}}
 # {{RecordBatchReaders}} is an iterator of {{RecordBatch}} via {{next_batch}}
 # {{ExecutionPlan}} 's {{size_hint}} is given by {{output_partitioning}}
 # {{RecordBatchReaders}}{{}} {{size_hint}}  is unknown, as it typically comes 
from scaning through a file

If this hypothesis holds, IMO we could convert {{ExecutionPlan}} to allow an 
{{IntoIter<Item=IntoIter<Item=RecordBatch>>}}

which would allow to easily flatten the iterator over a set of threads. Threads 
switch context during I/O, e.g. when a thread finishes reading a block of 
batches in a CSV, it can switch to execute whatever it is happening in another 
place.

This will also give more visibility to a scheduler, as it has all the necessary 
information it needs to schedule tasks.

[this answer in SO|https://stackoverflow.com/a/53176418/931303] uses IntoIter 
of IntoIter to implement a generic (single-threaded) {{merge_sort}}. This would 
also allow other architectures, as they would offload whole partitions to 
different processes. In our context, something like this (all static for now


{code:java}
fn merge_sorted<IterT: 'static + Send, IterIterT, T: 'static + Ord + Clone + 
fmt::Debug + Send>(arrays: IterIterT) -> Vec<T>
where
    IterT: IntoIterator<Item = T>,
    IterIterT: IntoIterator<Item = IterT>,
{
    let all_values = Arc::new(Mutex::new(vec![]));
    let threads: Vec<JoinHandle<()>> = arrays.into_iter().map(|array| {
        let mutex_clone = Arc::clone(&all_values);
        thread::spawn(move || {
            let mut values: Vec<T> = array.into_iter().collect();
            values.sort(); // this is wrong, but all information is available 
to do it
            mutex_clone.lock().unwrap().extend_from_slice(&values);
        })
    }).collect();
    for thread in threads {
        thread.join().unwrap()
    }
    let result = all_values.lock().unwrap().clone();
    result
}
{code}

> [Rust] [DataFusion] Re-implement threading model
> ------------------------------------------------
>
>                 Key: ARROW-9707
>                 URL: https://issues.apache.org/jira/browse/ARROW-9707
>             Project: Apache Arrow
>          Issue Type: Sub-task
>          Components: Rust, Rust - DataFusion
>            Reporter: Andy Grove
>            Assignee: Andy Grove
>            Priority: Major
>             Fix For: 2.0.0
>
>
> The current threading model is very simple and does not scale. We currently 
> use 1-2 dedicated threads per partition and they all run simultaneously, 
> which is a huge problem if you have more partitions than logical or physical 
> cores.
> This task is to re-implement the threading model so that query execution uses 
> a fixed (configurable) number of threads. Work will be broken down into 
> stages and tasks and each in-process executor (running on a dedicated thread) 
> will process its queue of tasks.
> This process will be driven by a scheduler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to