This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7afc99f3d2 Chore/fifo tests cleanup (#11616)
7afc99f3d2 is described below

commit 7afc99f3d2a321d853210af7bd0a4b49e5afa0c4
Author: Mehmet Ozan Kabak <[email protected]>
AuthorDate: Tue Jul 23 17:11:48 2024 +0300

    Chore/fifo tests cleanup (#11616)
    
    * Cleanup fifo tests
    
    * Resolve merge conflicts
---
 datafusion/core/src/physical_optimizer/mod.rs |   8 +-
 datafusion/core/tests/fifo/mod.rs             | 174 +++++++++++++-------------
 2 files changed, 90 insertions(+), 92 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/mod.rs 
b/datafusion/core/src/physical_optimizer/mod.rs
index a0c9c36977..7c508eeef8 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -33,12 +33,12 @@ pub mod projection_pushdown;
 pub mod pruning;
 pub mod replace_with_order_preserving_variants;
 pub mod sanity_checker;
-mod sort_pushdown;
+#[cfg(test)]
+pub mod test_utils;
 pub mod topk_aggregation;
 pub mod update_aggr_exprs;
-mod utils;
 
-#[cfg(test)]
-pub mod test_utils;
+mod sort_pushdown;
+mod utils;
 
 pub use datafusion_physical_optimizer::*;
diff --git a/datafusion/core/tests/fifo/mod.rs 
b/datafusion/core/tests/fifo/mod.rs
index 1df97b1636..6efbb9b029 100644
--- a/datafusion/core/tests/fifo/mod.rs
+++ b/datafusion/core/tests/fifo/mod.rs
@@ -6,7 +6,7 @@
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
 //
-//http://www.apache.org/licenses/LICENSE-2.0
+//   http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
@@ -16,38 +16,37 @@
 // under the License.
 
 //! This test demonstrates the DataFusion FIFO capabilities.
-//!
+
 #[cfg(target_family = "unix")]
 #[cfg(test)]
 mod unix_test {
-    use datafusion_common::instant::Instant;
-    use std::fs::{File, OpenOptions};
-    use std::io::Write;
+    use std::fs::File;
     use std::path::PathBuf;
     use std::sync::atomic::{AtomicBool, Ordering};
     use std::sync::Arc;
-    use std::thread;
     use std::time::Duration;
 
     use arrow::array::Array;
     use arrow::csv::ReaderBuilder;
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow_schema::SchemaRef;
-    use futures::StreamExt;
-    use nix::sys::stat;
-    use nix::unistd;
-    use tempfile::TempDir;
-    use tokio::task::{spawn_blocking, JoinHandle};
-
     use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, 
StreamTable};
     use datafusion::datasource::TableProvider;
     use datafusion::{
         prelude::{CsvReadOptions, SessionConfig, SessionContext},
         test_util::{aggr_test_schema, arrow_test_data},
     };
-    use datafusion_common::{exec_err, DataFusionError, Result};
+    use datafusion_common::instant::Instant;
+    use datafusion_common::{exec_err, Result};
     use datafusion_expr::Expr;
 
+    use futures::StreamExt;
+    use nix::sys::stat;
+    use nix::unistd;
+    use tempfile::TempDir;
+    use tokio::io::AsyncWriteExt;
+    use tokio::task::{spawn_blocking, JoinHandle};
+
     /// Makes a TableProvider for a fifo file
     fn fifo_table(
         schema: SchemaRef,
@@ -71,8 +70,8 @@ mod unix_test {
         }
     }
 
-    fn write_to_fifo(
-        mut file: &File,
+    async fn write_to_fifo(
+        file: &mut tokio::fs::File,
         line: &str,
         ref_time: Instant,
         broken_pipe_timeout: Duration,
@@ -80,11 +79,11 @@ mod unix_test {
         // We need to handle broken pipe error until the reader is ready. This
         // is why we use a timeout to limit the wait duration for the reader.
         // If the error is different than broken pipe, we fail immediately.
-        while let Err(e) = file.write_all(line.as_bytes()) {
+        while let Err(e) = file.write_all(line.as_bytes()).await {
             if e.raw_os_error().unwrap() == 32 {
                 let interval = Instant::now().duration_since(ref_time);
                 if interval < broken_pipe_timeout {
-                    thread::sleep(Duration::from_millis(100));
+                    tokio::time::sleep(Duration::from_millis(50)).await;
                     continue;
                 }
             }
@@ -93,28 +92,38 @@ mod unix_test {
         Ok(())
     }
 
-    fn create_writing_thread(
+    /// This function creates a writing task for the FIFO file. To verify
+    /// incremental processing, it waits for a signal to continue writing after
+    /// a certain number of lines are written.
+    #[allow(clippy::disallowed_methods)]
+    fn create_writing_task(
         file_path: PathBuf,
         header: String,
         lines: Vec<String>,
-        waiting_lock: Arc<AtomicBool>,
-        wait_until: usize,
+        waiting_signal: Arc<AtomicBool>,
+        send_before_waiting: usize,
     ) -> JoinHandle<()> {
         // Timeout for a long period of BrokenPipe error
         let broken_pipe_timeout = Duration::from_secs(10);
-        let sa = file_path.clone();
-        // Spawn a new thread to write to the FIFO file
-        #[allow(clippy::disallowed_methods)] // spawn allowed only in tests
-        spawn_blocking(move || {
-            let file = OpenOptions::new().write(true).open(sa).unwrap();
+        // Spawn a new task to write to the FIFO file
+        tokio::spawn(async move {
+            let mut file = tokio::fs::OpenOptions::new()
+                .write(true)
+                .open(file_path)
+                .await
+                .unwrap();
             // Reference time to use when deciding to fail the test
             let execution_start = Instant::now();
-            write_to_fifo(&file, &header, execution_start, 
broken_pipe_timeout).unwrap();
+            write_to_fifo(&mut file, &header, execution_start, 
broken_pipe_timeout)
+                .await
+                .unwrap();
             for (cnt, line) in lines.iter().enumerate() {
-                while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
-                    thread::sleep(Duration::from_millis(50));
+                while waiting_signal.load(Ordering::SeqCst) && cnt > 
send_before_waiting {
+                    tokio::time::sleep(Duration::from_millis(50)).await;
                 }
-                write_to_fifo(&file, line, execution_start, 
broken_pipe_timeout).unwrap();
+                write_to_fifo(&mut file, line, execution_start, 
broken_pipe_timeout)
+                    .await
+                    .unwrap();
             }
             drop(file);
         })
@@ -125,6 +134,8 @@ mod unix_test {
     const TEST_BATCH_SIZE: usize = 20;
     // Number of lines written to FIFO
     const TEST_DATA_SIZE: usize = 20_000;
+    // Number of lines to write before waiting to verify incremental processing
+    const SEND_BEFORE_WAITING: usize = 2 * TEST_BATCH_SIZE;
     // Number of lines what can be joined. Each joinable key produced 20 lines 
with
     // aggregate_test_100 dataset. We will use these joinable keys for 
understanding
     // incremental execution.
@@ -132,7 +143,7 @@ mod unix_test {
 
     // This test provides a relatively realistic end-to-end scenario where
     // we swap join sides to accommodate a FIFO source.
-    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
+    #[tokio::test]
     async fn unbounded_file_with_swapped_join() -> Result<()> {
         // Create session context
         let config = SessionConfig::new()
@@ -162,8 +173,8 @@ mod unix_test {
             .zip(0..TEST_DATA_SIZE)
             .map(|(a1, a2)| format!("{a1},{a2}\n"))
             .collect::<Vec<_>>();
-        // Create writing threads for the left and right FIFO files
-        let task = create_writing_thread(
+        // Create writing tasks for the left and right FIFO files
+        let task = create_writing_task(
             fifo_path.clone(),
             "a1,a2\n".to_owned(),
             lines,
@@ -190,7 +201,16 @@ mod unix_test {
         )
         .await?;
         // Execute the query
-        let df = ctx.sql("SELECT t1.a2, t2.c1, t2.c4, t2.c5 FROM left as t1 
JOIN right as t2 ON t1.a1 = t2.c1").await?;
+        let df = ctx
+            .sql(
+                "SELECT
+                  t1.a2, t2.c1, t2.c4, t2.c5
+                FROM
+                  left as t1, right as t2
+                WHERE
+                  t1.a1 = t2.c1",
+            )
+            .await?;
         let mut stream = df.execute_stream().await?;
         while (stream.next().await).is_some() {
             waiting.store(false, Ordering::SeqCst);
@@ -199,16 +219,9 @@ mod unix_test {
         Ok(())
     }
 
-    #[derive(Debug, PartialEq)]
-    enum JoinOperation {
-        LeftUnmatched,
-        RightUnmatched,
-        Equal,
-    }
-
-    // This test provides a relatively realistic end-to-end scenario where
-    // we change the join into a [SymmetricHashJoin] to accommodate two
-    // unbounded (FIFO) sources.
+    /// This test provides a relatively realistic end-to-end scenario where
+    /// we change the join into a `SymmetricHashJoinExec` to accommodate two
+    /// unbounded (FIFO) sources.
     #[tokio::test]
     async fn unbounded_file_with_symmetric_join() -> Result<()> {
         // Create session context
@@ -247,19 +260,18 @@ mod unix_test {
         let df = ctx
             .sql(
                 "SELECT
-                  t1.a1,
-                  t1.a2,
-                  t2.a1,
-                  t2.a2
+                  t1.a1, t1.a2, t2.a1, t2.a2
                 FROM
-                  left as t1 FULL
-                  JOIN right as t2 ON t1.a2 = t2.a2
-                  AND t1.a1 > t2.a1 + 4
-                  AND t1.a1 < t2.a1 + 9",
+                  left as t1
+                FULL JOIN
+                  right as t2
+                ON
+                  t1.a2 = t2.a2 AND
+                  t1.a1 > t2.a1 + 4 AND
+                  t1.a1 < t2.a1 + 9",
             )
             .await?;
         let mut stream = df.execute_stream().await?;
-        let mut operations = vec![];
 
         // Tasks
         let mut tasks: Vec<JoinHandle<()>> = vec![];
@@ -273,54 +285,43 @@ mod unix_test {
             .map(|(a1, a2)| format!("{a1},{a2}\n"))
             .collect::<Vec<_>>();
 
-        // Create writing threads for the left and right FIFO files
-        tasks.push(create_writing_thread(
+        // Create writing tasks for the left and right FIFO files
+        tasks.push(create_writing_task(
             left_fifo,
             "a1,a2\n".to_owned(),
             lines.clone(),
             waiting.clone(),
-            TEST_BATCH_SIZE,
+            SEND_BEFORE_WAITING,
         ));
-        tasks.push(create_writing_thread(
+        tasks.push(create_writing_task(
             right_fifo,
             "a1,a2\n".to_owned(),
-            lines.clone(),
+            lines,
             waiting.clone(),
-            TEST_BATCH_SIZE,
+            SEND_BEFORE_WAITING,
         ));
-        // Partial.
+        // Collect output data:
+        let (mut equal, mut left, mut right) = (0, 0, 0);
         while let Some(Ok(batch)) = stream.next().await {
             waiting.store(false, Ordering::SeqCst);
             let left_unmatched = batch.column(2).null_count();
             let right_unmatched = batch.column(0).null_count();
-            let op = if left_unmatched == 0 && right_unmatched == 0 {
-                JoinOperation::Equal
-            } else if right_unmatched > left_unmatched {
-                JoinOperation::RightUnmatched
+            if left_unmatched == 0 && right_unmatched == 0 {
+                equal += 1;
+            } else if right_unmatched <= left_unmatched {
+                left += 1;
             } else {
-                JoinOperation::LeftUnmatched
+                right += 1;
             };
-            operations.push(op);
         }
         futures::future::try_join_all(tasks).await.unwrap();
 
-        // The SymmetricHashJoin executor produces FULL join results at every
-        // pruning, which happens before it reaches the end of input and more
-        // than once. In this test, we feed partially joinable data to both
-        // sides in order to ensure that left or right unmatched results are
-        // generated more than once during the test.
-        assert!(
-            operations
-                .iter()
-                .filter(|&n| JoinOperation::RightUnmatched.eq(n))
-                .count()
-                > 1
-                && operations
-                    .iter()
-                    .filter(|&n| JoinOperation::LeftUnmatched.eq(n))
-                    .count()
-                    > 1
-        );
+        // The symmetric hash join algorithm produces FULL join results at
+        // every pruning, which happens before it reaches the end of input and
+        // more than once. In this test, we feed partially joinable data to
+        // both sides in order to ensure that left or right unmatched results
+        // are generated as expected.
+        assert!(equal >= 0 && left > 1 && right > 1);
         Ok(())
     }
 
@@ -341,17 +342,14 @@ mod unix_test {
             (source_fifo_path.clone(), source_fifo_path.display());
         // Tasks
         let mut tasks: Vec<JoinHandle<()>> = vec![];
-        // TEST_BATCH_SIZE + 1 rows will be provided. However, after 
processing precisely
-        // TEST_BATCH_SIZE rows, the program will pause and wait for a batch 
to be read in another
-        // thread. This approach ensures that the pipeline remains unbroken.
-        tasks.push(create_writing_thread(
+        tasks.push(create_writing_task(
             source_fifo_path_thread,
             "a1,a2\n".to_owned(),
             (0..TEST_DATA_SIZE)
                 .map(|_| "a,1\n".to_string())
                 .collect::<Vec<_>>(),
             waiting,
-            TEST_BATCH_SIZE,
+            SEND_BEFORE_WAITING,
         ));
         // Create a new temporary FIFO file
         let sink_fifo_path = create_fifo_file(&tmp_dir, "sink.csv")?;
@@ -370,8 +368,8 @@ mod unix_test {
 
             let mut reader = ReaderBuilder::new(schema)
                 .with_batch_size(TEST_BATCH_SIZE)
+                .with_header(true)
                 .build(file)
-                .map_err(|e| DataFusionError::Internal(e.to_string()))
                 .unwrap();
 
             while let Some(Ok(_)) = reader.next() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to