Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
alamb merged PR #16331: URL: https://github.com/apache/datafusion/pull/16331 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
alamb commented on PR #16331: URL: https://github.com/apache/datafusion/pull/16331#issuecomment-2995781080 Thanks @adriangb and @Omega359 for the help with this one (and to @tustvold for / @ion-elgreco for the underlying feature) It's taken a while but we have made it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
alamb commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2159687190
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,349 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O and
spawn
+/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
+/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
+/// CPU bound tasks will often be simpler in larger applications, even though
it
+/// makes this example slightly more complex.
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may also
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
adriangb commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2159381725
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,349 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O and
spawn
+/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
+/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
+/// CPU bound tasks will often be simpler in larger applications, even though
it
+/// makes this example slightly more complex.
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may al
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
Omega359 commented on PR #16331: URL: https://github.com/apache/datafusion/pull/16331#issuecomment-2981034448 > Related to this work specifically, is there any guidance on using something like Tokio console with DataFusion? I was about to start experimenting with it myself, but if anyone else has written up some instructions or experiences that might save me some time. I've used Tokio console in the past in my app to see if I could find the cause of some app slowdowns. It worked fine but didn't help me personally find the issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
alamb commented on PR #16331: URL: https://github.com/apache/datafusion/pull/16331#issuecomment-2981030863 > @alamb thanks for the pointer. > > Related to this work specifically, is there any guidance on using something like Tokio console with DataFusion? I was about to start experimenting with it myself, but if anyone else has written up some instructions or experiences that might save me some time. I know we have used it at INfluxData I vaguely remember others reporting they have too (maybe @adriangb ) but I don't know of any writeups (it would be another great doc contribution if you figure it out ) :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
pepijnve commented on PR #16331: URL: https://github.com/apache/datafusion/pull/16331#issuecomment-2980997712 @alamb thanks for the pointer. I've been digging through all the work that has already been done, looking particularly at the rayon experiments. Rayon's idea of "yield now" is particularly interesting. Rather than unwinding the stack, it will actually try to run other work itself. I wonder if this approach could lead to stack overflows. Related to this work specifically, is there any guidance on using something like Tokio console with DataFusion? I was about to start experimenting with it myself, but if anyone else has written up some instructions or experiences that might save me some time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
alamb commented on PR #16331: URL: https://github.com/apache/datafusion/pull/16331#issuecomment-2980979266 FYI @pepijnve this is perhaps relevant to you as you have been working closely with tokio / tokio runtimes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
alamb commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2138052407
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O and
spawn
+/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
+/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
+/// CPU bound tasks will often be simpler in larger applications, even though
it
+/// makes this example slightly more complex.
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may also
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
alamb commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2138052407
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O and
spawn
+/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
+/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
+/// CPU bound tasks will often be simpler in larger applications, even though
it
+/// makes this example slightly more complex.
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may also
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
alamb commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2136243498
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may also do network
+// I/O, for example to contact a remote catalog or do an object store LIST
+let df = ctx.sql(sql).await?;
+
+// While many examples call `collect` or `show()`, those methods buffers
the
+// results. Internally DataFusion generates output a RecordBatch at a time
+
+//
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
Omega359 commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2136094718
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O and
spawn
+/// CPU intensive tasks on a separate [`Runtime`], mirroring the common pattern
+/// when using Rust libraries such as `tonic`. Using a separate `Runtime` for
+/// CPU bound tasks will often be simpler in larger applications, even though
it
+/// makes this example slightly more complex.
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may al
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
alamb commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2135821535
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may also do network
+// I/O, for example to contact a remote catalog or do an object store LIST
+let df = ctx.sql(sql).await?;
+
+// While many examples call `collect` or `show()`, those methods buffers
the
+// results. Internally DataFusion generates output a RecordBatch at a time
+
+//
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
Omega359 commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2135785917
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may also do network
+// I/O, for example to contact a remote catalog or do an object store LIST
+let df = ctx.sql(sql).await?;
+
+// While many examples call `collect` or `show()`, those methods buffers
the
+// results. Internally DataFusion generates output a RecordBatch at a time
+
+
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
Omega359 commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2135759791
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may also do network
+// I/O, for example to contact a remote catalog or do an object store LIST
+let df = ctx.sql(sql).await?;
+
+// While many examples call `collect` or `show()`, those methods buffers
the
+// results. Internally DataFusion generates output a RecordBatch at a time
+
+
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
alamb commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2135752123
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may also do network
+// I/O, for example to contact a remote catalog or do an object store LIST
+let df = ctx.sql(sql).await?;
+
+// While many examples call `collect` or `show()`, those methods buffers
the
+// results. Internally DataFusion generates output a RecordBatch at a time
+
+//
Re: [PR] Example for using a separate threadpool for CPU bound work (try 3) [datafusion]
Omega359 commented on code in PR #16331:
URL: https://github.com/apache/datafusion/pull/16331#discussion_r2135742621
##
datafusion-examples/examples/thread_pools.rs:
##
@@ -0,0 +1,346 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This example shows how to use separate thread pools (tokio [`Runtime`]))s
to
+//! run the IO and CPU intensive parts of DataFusion plans.
+//!
+//! # Background
+//!
+//! DataFusion, by default, plans and executes all operations (both CPU and IO)
+//! on the same thread pool. This makes it fast and easy to get started, but
+//! can cause issues when running at scale, especially when fetching and
operating
+//! on data directly from remote sources.
+//!
+//! Specifically, without configuration such as in this example, DataFusion
+//! plans and executes everything the same thread pool (Tokio Runtime),
including
+//! any I/O, such as reading Parquet files from remote object storage
+//! (e.g. AWS S3), catalog access, and CPU intensive work. Running this diverse
+//! workload can lead to issues described in the [Architecture section] such as
+//! throttled network bandwidth (due to congestion control) and increased
+//! latencies or timeouts while processing network messages.
+//!
+//! [Architecture section]:
https://docs.rs/datafusion/latest/datafusion/index.html#thread-scheduling-cpu--io-thread-pools-and-tokio-runtimes
+
+use arrow::util::pretty::pretty_format_batches;
+use datafusion::common::runtime::JoinSet;
+use datafusion::error::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::prelude::*;
+use futures::stream::StreamExt;
+use object_store::client::SpawnedReqwestConnector;
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::runtime::Handle;
+use tokio::sync::Notify;
+use url::Url;
+
+/// Normally, you don't need to worry about the details of the tokio
+/// [`Runtime`], but for this example it is important to understand how the
+/// [`Runtime`]s work.
+///
+/// Each thread has "current" runtime that is installed in a thread local
+/// variable which is used by the `tokio::spawn` function.
+///
+/// The `#[tokio::main]` macro creates a [`Runtime`] and installs it as
+/// as the "current" runtime in a thread local variable, on which any `async`
+/// [`Future`], [`Stream]`s and [`Task]`s are run.
+///
+/// This example uses the runtime created by [`tokio::main`] to do I/O
+#[tokio::main]
+async fn main() -> Result<()> {
+// The first two examples read local files. Enabling the URL table feature
+// lets us treat filenames as tables in SQL.
+let ctx = SessionContext::new().enable_url_table();
+let sql = format!(
+"SELECT * FROM '{}/alltypes_plain.parquet'",
+datafusion::test_util::parquet_test_data()
+);
+
+// Run a query on the current runtime. Calling `await` means the future
+// (in this case the `async` function and all spawned work in DataFusion
+// plans) on the current runtime.
+same_runtime(&ctx, &sql).await?;
+
+// Run the same query but this time on a different runtime.
+//
+// Since we call `await` here, the `async` function itself runs on the
+// current runtime, but internally `different_runtime_basic` executes the
+// DataFusion plan on a different Runtime.
+different_runtime_basic(ctx, sql).await?;
+
+// Run the same query on a different runtime, including remote IO.
+//
+// NOTE: This is best practice for production systems
+different_runtime_advanced().await?;
+
+Ok(())
+}
+
+/// Run queries directly on the current tokio `Runtime`
+///
+/// This is how most examples in DataFusion are written and works well for
+/// development, local query processing, and non latency sensitive workloads.
+async fn same_runtime(ctx: &SessionContext, sql: &str) -> Result<()> {
+// Calling .sql is an async function as it may also do network
+// I/O, for example to contact a remote catalog or do an object store LIST
+let df = ctx.sql(sql).await?;
+
+// While many examples call `collect` or `show()`, those methods buffers
the
+// results. Internally DataFusion generates output a RecordBatch at a time
+
+
