This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ea0ba99d94 Fix `file_stream_provider` example compilation failure on
windows (#10975)
ea0ba99d94 is described below
commit ea0ba99d94172780ea6e84010362bc1967d39748
Author: 张林伟 <[email protected]>
AuthorDate: Wed Jun 19 21:58:45 2024 +0800
Fix `file_stream_provider` example compilation failure on windows (#10975)
* Fix file_stream_provider on windows
* Add .await & fmt
---
.../examples/file_stream_provider.rs | 326 +++++++++++----------
1 file changed, 171 insertions(+), 155 deletions(-)
diff --git a/datafusion-examples/examples/file_stream_provider.rs
b/datafusion-examples/examples/file_stream_provider.rs
index 4e79f9afc2..b8549bd6b6 100644
--- a/datafusion-examples/examples/file_stream_provider.rs
+++ b/datafusion-examples/examples/file_stream_provider.rs
@@ -15,172 +15,188 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::assert_batches_eq;
-use datafusion_common::instant::Instant;
-use std::fs::{File, OpenOptions};
-use std::io::Write;
-use std::path::PathBuf;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::Arc;
-use std::thread;
-use std::time::Duration;
-
-use arrow::datatypes::{DataType, Field, Schema};
-use arrow_schema::SchemaRef;
-use futures::StreamExt;
-use nix::sys::stat;
-use nix::unistd;
-use tempfile::TempDir;
-use tokio::task::JoinSet;
-
-use datafusion::datasource::stream::{FileStreamProvider, StreamConfig,
StreamTable};
-use datafusion::datasource::TableProvider;
-use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion_common::{exec_err, Result};
-use datafusion_expr::Expr;
-
-// Number of lines written to FIFO
-const TEST_BATCH_SIZE: usize = 5;
-const TEST_DATA_SIZE: usize = 5;
-
-/// Makes a TableProvider for a fifo file using `StreamTable` with the
`StreamProvider` trait
-fn fifo_table(
- schema: SchemaRef,
- path: impl Into<PathBuf>,
- sort: Vec<Vec<Expr>>,
-) -> Arc<dyn TableProvider> {
- let source = FileStreamProvider::new_file(schema, path.into())
- .with_batch_size(TEST_BATCH_SIZE)
- .with_header(true);
- let config = StreamConfig::new(Arc::new(source)).with_order(sort);
- Arc::new(StreamTable::new(Arc::new(config)))
-}
+#[cfg(not(target_os = "windows"))]
+mod non_windows {
+ use datafusion::assert_batches_eq;
+ use datafusion_common::instant::Instant;
+ use std::fs::{File, OpenOptions};
+ use std::io::Write;
+ use std::path::PathBuf;
+ use std::sync::atomic::{AtomicBool, Ordering};
+ use std::sync::Arc;
+ use std::thread;
+ use std::time::Duration;
+
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow_schema::SchemaRef;
+ use futures::StreamExt;
+ use nix::sys::stat;
+ use nix::unistd;
+ use tempfile::TempDir;
+ use tokio::task::JoinSet;
+
+ use datafusion::datasource::stream::{FileStreamProvider, StreamConfig,
StreamTable};
+ use datafusion::datasource::TableProvider;
+ use datafusion::prelude::{SessionConfig, SessionContext};
+ use datafusion_common::{exec_err, Result};
+ use datafusion_expr::Expr;
+
+ // Number of lines written to FIFO
+ const TEST_BATCH_SIZE: usize = 5;
+ const TEST_DATA_SIZE: usize = 5;
+
+ /// Makes a TableProvider for a fifo file using `StreamTable` with the
`StreamProvider` trait
+ fn fifo_table(
+ schema: SchemaRef,
+ path: impl Into<PathBuf>,
+ sort: Vec<Vec<Expr>>,
+ ) -> Arc<dyn TableProvider> {
+ let source = FileStreamProvider::new_file(schema, path.into())
+ .with_batch_size(TEST_BATCH_SIZE)
+ .with_header(true);
+ let config = StreamConfig::new(Arc::new(source)).with_order(sort);
+ Arc::new(StreamTable::new(Arc::new(config)))
+ }
-fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
- let file_path = tmp_dir.path().join(file_name);
- // Simulate an infinite environment via a FIFO file
- if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
- exec_err!("{}", e)
- } else {
- Ok(file_path)
+ fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf>
{
+ let file_path = tmp_dir.path().join(file_name);
+ // Simulate an infinite environment via a FIFO file
+ if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
+ exec_err!("{}", e)
+ } else {
+ Ok(file_path)
+ }
}
-}
-fn write_to_fifo(
- mut file: &File,
- line: &str,
- ref_time: Instant,
- broken_pipe_timeout: Duration,
-) -> Result<()> {
- // We need to handle broken pipe error until the reader is ready. This
- // is why we use a timeout to limit the wait duration for the reader.
- // If the error is different than broken pipe, we fail immediately.
- while let Err(e) = file.write_all(line.as_bytes()) {
- if e.raw_os_error().unwrap() == 32 {
- let interval = Instant::now().duration_since(ref_time);
- if interval < broken_pipe_timeout {
- thread::sleep(Duration::from_millis(100));
- continue;
+ fn write_to_fifo(
+ mut file: &File,
+ line: &str,
+ ref_time: Instant,
+ broken_pipe_timeout: Duration,
+ ) -> Result<()> {
+ // We need to handle broken pipe error until the reader is ready. This
+ // is why we use a timeout to limit the wait duration for the reader.
+ // If the error is different than broken pipe, we fail immediately.
+ while let Err(e) = file.write_all(line.as_bytes()) {
+ if e.raw_os_error().unwrap() == 32 {
+ let interval = Instant::now().duration_since(ref_time);
+ if interval < broken_pipe_timeout {
+ thread::sleep(Duration::from_millis(100));
+ continue;
+ }
}
+ return exec_err!("{}", e);
}
- return exec_err!("{}", e);
+ Ok(())
}
- Ok(())
-}
-fn create_writing_thread(
- file_path: PathBuf,
- maybe_header: Option<String>,
- lines: Vec<String>,
- waiting_lock: Arc<AtomicBool>,
- wait_until: usize,
- tasks: &mut JoinSet<()>,
-) {
- // Timeout for a long period of BrokenPipe error
- let broken_pipe_timeout = Duration::from_secs(10);
- let sa = file_path.clone();
- // Spawn a new thread to write to the FIFO file
- #[allow(clippy::disallowed_methods)] // spawn allowed only in tests
- tasks.spawn_blocking(move || {
- let file = OpenOptions::new().write(true).open(sa).unwrap();
- // Reference time to use when deciding to fail the test
- let execution_start = Instant::now();
- if let Some(header) = maybe_header {
- write_to_fifo(&file, &header, execution_start,
broken_pipe_timeout).unwrap();
- }
- for (cnt, line) in lines.iter().enumerate() {
- while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
- thread::sleep(Duration::from_millis(50));
+ fn create_writing_thread(
+ file_path: PathBuf,
+ maybe_header: Option<String>,
+ lines: Vec<String>,
+ waiting_lock: Arc<AtomicBool>,
+ wait_until: usize,
+ tasks: &mut JoinSet<()>,
+ ) {
+ // Timeout for a long period of BrokenPipe error
+ let broken_pipe_timeout = Duration::from_secs(10);
+ let sa = file_path.clone();
+ // Spawn a new thread to write to the FIFO file
+ #[allow(clippy::disallowed_methods)] // spawn allowed only in tests
+ tasks.spawn_blocking(move || {
+ let file = OpenOptions::new().write(true).open(sa).unwrap();
+ // Reference time to use when deciding to fail the test
+ let execution_start = Instant::now();
+ if let Some(header) = maybe_header {
+ write_to_fifo(&file, &header, execution_start,
broken_pipe_timeout)
+ .unwrap();
+ }
+ for (cnt, line) in lines.iter().enumerate() {
+ while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
+ thread::sleep(Duration::from_millis(50));
+ }
+ write_to_fifo(&file, line, execution_start,
broken_pipe_timeout).unwrap();
}
- write_to_fifo(&file, line, execution_start,
broken_pipe_timeout).unwrap();
+ drop(file);
+ });
+ }
+
+ /// This example demonstrates a scanning against an Arrow data source
(JSON) and
+ /// fetching results
+ pub async fn main() -> Result<()> {
+ // Create session context
+ let config = SessionConfig::new()
+ .with_batch_size(TEST_BATCH_SIZE)
+ .with_collect_statistics(false)
+ .with_target_partitions(1);
+ let ctx = SessionContext::new_with_config(config);
+ let tmp_dir = TempDir::new()?;
+ let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;
+
+ let mut tasks: JoinSet<()> = JoinSet::new();
+ let waiting = Arc::new(AtomicBool::new(true));
+
+ let data_iter = 0..TEST_DATA_SIZE;
+ let lines = data_iter
+ .map(|i| format!("{},{}\n", i, i + 1))
+ .collect::<Vec<_>>();
+
+ create_writing_thread(
+ fifo_path.clone(),
+ Some("a1,a2\n".to_owned()),
+ lines.clone(),
+ waiting.clone(),
+ TEST_DATA_SIZE,
+ &mut tasks,
+ );
+
+ // Create schema
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::UInt32, false),
+ Field::new("a2", DataType::UInt32, false),
+ ]));
+
+ // Specify the ordering:
+ let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
+
+ let provider = fifo_table(schema.clone(), fifo_path, order.clone());
+ ctx.register_table("fifo", provider)?;
+
+ let df = ctx.sql("SELECT * FROM fifo").await.unwrap();
+ let mut stream = df.execute_stream().await.unwrap();
+
+ let mut batches = Vec::new();
+ if let Some(Ok(batch)) = stream.next().await {
+ batches.push(batch)
}
- drop(file);
- });
+
+ let expected = vec![
+ "+----+----+",
+ "| a1 | a2 |",
+ "+----+----+",
+ "| 0 | 1 |",
+ "| 1 | 2 |",
+ "| 2 | 3 |",
+ "| 3 | 4 |",
+ "| 4 | 5 |",
+ "+----+----+",
+ ];
+
+ assert_batches_eq!(&expected, &batches);
+
+ Ok(())
+ }
}
-/// This example demonstrates a scanning against an Arrow data source (JSON)
and
-/// fetching results
#[tokio::main]
-async fn main() -> Result<()> {
- // Create session context
- let config = SessionConfig::new()
- .with_batch_size(TEST_BATCH_SIZE)
- .with_collect_statistics(false)
- .with_target_partitions(1);
- let ctx = SessionContext::new_with_config(config);
- let tmp_dir = TempDir::new()?;
- let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;
-
- let mut tasks: JoinSet<()> = JoinSet::new();
- let waiting = Arc::new(AtomicBool::new(true));
-
- let data_iter = 0..TEST_DATA_SIZE;
- let lines = data_iter
- .map(|i| format!("{},{}\n", i, i + 1))
- .collect::<Vec<_>>();
-
- create_writing_thread(
- fifo_path.clone(),
- Some("a1,a2\n".to_owned()),
- lines.clone(),
- waiting.clone(),
- TEST_DATA_SIZE,
- &mut tasks,
- );
-
- // Create schema
- let schema = Arc::new(Schema::new(vec![
- Field::new("a1", DataType::UInt32, false),
- Field::new("a2", DataType::UInt32, false),
- ]));
-
- // Specify the ordering:
- let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
-
- let provider = fifo_table(schema.clone(), fifo_path, order.clone());
- ctx.register_table("fifo", provider)?;
-
- let df = ctx.sql("SELECT * FROM fifo").await.unwrap();
- let mut stream = df.execute_stream().await.unwrap();
-
- let mut batches = Vec::new();
- if let Some(Ok(batch)) = stream.next().await {
- batches.push(batch)
+async fn main() -> datafusion_common::Result<()> {
+ #[cfg(target_os = "windows")]
+ {
+ println!("file_stream_provider example does not work on windows");
+ Ok(())
+ }
+ #[cfg(not(target_os = "windows"))]
+ {
+ non_windows::main().await
}
-
- let expected = vec![
- "+----+----+",
- "| a1 | a2 |",
- "+----+----+",
- "| 0 | 1 |",
- "| 1 | 2 |",
- "| 2 | 3 |",
- "| 3 | 4 |",
- "| 4 | 5 |",
- "+----+----+",
- ];
-
- assert_batches_eq!(&expected, &batches);
-
- Ok(())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]