[
https://issues.apache.org/jira/browse/ARROW-9888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated ARROW-9888:
----------------------------------
Labels: pull-request-available (was: )
> [Rust] [DataFusion] Allow ExecutionContext to be shared between threads
> -----------------------------------------------------------------------
>
> Key: ARROW-9888
> URL: https://issues.apache.org/jira/browse/ARROW-9888
> Project: Apache Arrow
> Issue Type: New Feature
> Reporter: Andrew Lamb
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> As suggested by Jorge on https://github.com/apache/arrow/pull/8079
> The high level idea is to allow ExecutionContext on multi-threaded
> environments such as Python.
> The two use-cases:
> 1. when a project is planning a complex number of plans that depend on a
> common set of sources and UDFs, it would be nice to be able to multi-thread
> the planning. This is particularly important when planning requires reading
> remote metadata to formulate themselves (e.g. when the source is in s3 with
> many partitions). Metadata reading is often slow and network bounded, which
> makes threads suitable for these workloads. If multi-threading is not
> possible, either each plan needs to read the metadata independently (one
> context per plan) or planning must be sequential (with lots of network
> waiting).
> 2. when creating bindings to programming languages that support
> multi-threading, it would be nice for the ExecutionContext to be thread safe,
> so that we can more easily integrate with those languages.
> The code might look like:
> {code}
> alamb@MacBook-Pro rust % git diff
> diff --git a/rust/datafusion/src/execution/context.rs
> b/rust/datafusion/src/execution/context.rs
> index 5f8aa342e..7374b0a78 100644
> --- a/rust/datafusion/src/execution/context.rs
> +++ b/rust/datafusion/src/execution/context.rs
> @@ -460,7 +460,7 @@ mod tests {
> use arrow::array::{ArrayRef, Int32Array};
> use arrow::compute::add;
> use std::fs::File;
> - use std::io::prelude::*;
> + use std::{sync::Mutex, io::prelude::*};
> use tempdir::TempDir;
> use test::*;
>
> @@ -928,6 +928,28 @@ mod tests {
> Ok(())
> }
>
> + #[test]
> + fn send_context_to_threads() -> Result<()> {
> + // ensure that ExecutionContext's can be read by multiple threads
> concurrently
> + let tmp_dir = TempDir::new("send_context_to_threads")?;
> + let partition_count = 4;
> + let mut ctx = Arc::new(Mutex::new(create_ctx(&tmp_dir,
> partition_count)?));
> +
> + let threads: Vec<JoinHandle<Result<_>>> = (0..2)
> + .map(|_| { ctx.clone() })
> + .map(|ctx_clone| thread::spawn(move || {
> + let ctx = ctx_clone.lock().expect("Locked context");
> + // Ensure we can create logical plan code on a separate
> thread.
> + ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 >
> 0 AND c1 < 3")
> + }))
> + .collect();
> +
> + for thread in threads {
> + thread.join().expect("Failed to join thread")?;
> + }
> + Ok(())
> + }
> +
> #[test]
> fn scalar_udf() -> Result<()> {
> let schema = Schema::new(vec![
> {code}
> At the moment, Rust refuses to compile this example (and also refuses to
> share ExecutionContexts between threads) due to the following (namely that
> there are several `dyn` objects that are also not marked as Send + Sync:
> {code}
> Compiling datafusion v2.0.0-SNAPSHOT
> (/Users/alamb/Software/arrow/rust/datafusion)
> error[E0277]: `(dyn execution::physical_plan::PhysicalPlanner + 'static)`
> cannot be sent between threads safely
> --> datafusion/src/execution/context.rs:940:30
> |
> 940 | .map(|ctx_clone| thread::spawn(move || {
> | ^^^^^^^^^^^^^ `(dyn
> execution::physical_plan::PhysicalPlanner + 'static)` cannot be sent between
> threads safely
> |
> :::
> /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
> |
> 616 | F: Send + 'static,
> | ---- required by this bound in `std::thread::spawn`
> |
> = help: the trait `std::marker::Send` is not implemented for `(dyn
> execution::physical_plan::PhysicalPlanner + 'static)`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for `std::sync::Arc<(dyn
> execution::physical_plan::PhysicalPlanner + 'static)>`
> = note: required because it appears within the type
> `std::option::Option<std::sync::Arc<(dyn
> execution::physical_plan::PhysicalPlanner + 'static)>>`
> = note: required because it appears within the type
> `execution::context::ExecutionConfig`
> = note: required because it appears within the type
> `execution::context::ExecutionContextState`
> = note: required because it appears within the type
> `execution::context::ExecutionContext`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for
> `std::sync::Mutex<execution::context::ExecutionContext>`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for
> `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
> = note: required because it appears within the type
> `[closure@datafusion/src/execution/context.rs:940:44: 944:14
> ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
> error[E0277]: `(dyn execution::physical_plan::PhysicalPlanner + 'static)`
> cannot be shared between threads safely
> --> datafusion/src/execution/context.rs:940:30
> |
> 940 | .map(|ctx_clone| thread::spawn(move || {
> | ^^^^^^^^^^^^^ `(dyn
> execution::physical_plan::PhysicalPlanner + 'static)` cannot be shared
> between threads safely
> |
> :::
> /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
> |
> 616 | F: Send + 'static,
> | ---- required by this bound in `std::thread::spawn`
> |
> = help: the trait `std::marker::Sync` is not implemented for `(dyn
> execution::physical_plan::PhysicalPlanner + 'static)`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for `std::sync::Arc<(dyn
> execution::physical_plan::PhysicalPlanner + 'static)>`
> = note: required because it appears within the type
> `std::option::Option<std::sync::Arc<(dyn
> execution::physical_plan::PhysicalPlanner + 'static)>>`
> = note: required because it appears within the type
> `execution::context::ExecutionConfig`
> = note: required because it appears within the type
> `execution::context::ExecutionContextState`
> = note: required because it appears within the type
> `execution::context::ExecutionContext`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for
> `std::sync::Mutex<execution::context::ExecutionContext>`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for
> `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
> = note: required because it appears within the type
> `[closure@datafusion/src/execution/context.rs:940:44: 944:14
> ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
> error[E0277]: `(dyn datasource::datasource::TableProvider + 'static)` cannot
> be sent between threads safely
> --> datafusion/src/execution/context.rs:940:30
> |
> 940 | .map(|ctx_clone| thread::spawn(move || {
> | ^^^^^^^^^^^^^ `(dyn
> datasource::datasource::TableProvider + 'static)` cannot be sent between
> threads safely
> |
> :::
> /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
> |
> 616 | F: Send + 'static,
> | ---- required by this bound in `std::thread::spawn`
> |
> = help: the trait `std::marker::Send` is not implemented for `(dyn
> datasource::datasource::TableProvider + 'static)`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for `std::sync::Arc<(dyn
> datasource::datasource::TableProvider + 'static)>`
> = note: required because it appears within the type
> `(std::string::String, std::sync::Arc<(dyn
> datasource::datasource::TableProvider + 'static)>)`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for `hashbrown::raw::RawTable<(std::string::String,
> std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)>`
> = note: required because it appears within the type
> `hashbrown::map::HashMap<std::string::String, std::sync::Arc<(dyn
> datasource::datasource::TableProvider + 'static)>,
> std::collections::hash_map::RandomState>`
> = note: required because it appears within the type
> `std::collections::HashMap<std::string::String, std::sync::Arc<(dyn
> datasource::datasource::TableProvider + 'static)>>`
> = note: required because it appears within the type
> `execution::context::ExecutionContextState`
> = note: required because it appears within the type
> `execution::context::ExecutionContext`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for
> `std::sync::Mutex<execution::context::ExecutionContext>`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for
> `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
> = note: required because it appears within the type
> `[closure@datafusion/src/execution/context.rs:940:44: 944:14
> ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
> error[E0277]: `(dyn datasource::datasource::TableProvider + 'static)` cannot
> be shared between threads safely
> --> datafusion/src/execution/context.rs:940:30
> |
> 940 | .map(|ctx_clone| thread::spawn(move || {
> | ^^^^^^^^^^^^^ `(dyn
> datasource::datasource::TableProvider + 'static)` cannot be shared between
> threads safely
> |
> :::
> /Users/alamb/.rustup/toolchains/nightly-2020-04-22-x86_64-apple-darwin/lib/rustlib/src/rust/src/libstd/thread/mod.rs:616:8
> |
> 616 | F: Send + 'static,
> | ---- required by this bound in `std::thread::spawn`
> |
> = help: the trait `std::marker::Sync` is not implemented for `(dyn
> datasource::datasource::TableProvider + 'static)`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for `std::sync::Arc<(dyn
> datasource::datasource::TableProvider + 'static)>`
> = note: required because it appears within the type
> `(std::string::String, std::sync::Arc<(dyn
> datasource::datasource::TableProvider + 'static)>)`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for `hashbrown::raw::RawTable<(std::string::String,
> std::sync::Arc<(dyn datasource::datasource::TableProvider + 'static)>)>`
> = note: required because it appears within the type
> `hashbrown::map::HashMap<std::string::String, std::sync::Arc<(dyn
> datasource::datasource::TableProvider + 'static)>,
> std::collections::hash_map::RandomState>`
> = note: required because it appears within the type
> `std::collections::HashMap<std::string::String, std::sync::Arc<(dyn
> datasource::datasource::TableProvider + 'static)>>`
> = note: required because it appears within the type
> `execution::context::ExecutionContextState`
> = note: required because it appears within the type
> `execution::context::ExecutionContext`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for
> `std::sync::Mutex<execution::context::ExecutionContext>`
> = note: required because of the requirements on the impl of
> `std::marker::Send` for
> `std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>`
> = note: required because it appears within the type
> `[closure@datafusion/src/execution/context.rs:940:44: 944:14
> ctx_clone:std::sync::Arc<std::sync::Mutex<execution::context::ExecutionContext>>]`
> Compiling arrow-benchmarks v2.0.0-SNAPSHOT
> (/Users/alamb/Software/arrow/rust/benchmarks)
> error: aborting due to 4 previous errors
> For more information about this error, try `rustc --explain E0277`.
> error: could not compile `datafusion`.
> To learn more, run the command again with --verbose.
> warning: build failed, waiting for other jobs to finish...
> error: build failed
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)