This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7afc99f3d2 Chore/fifo tests cleanup (#11616)
7afc99f3d2 is described below
commit 7afc99f3d2a321d853210af7bd0a4b49e5afa0c4
Author: Mehmet Ozan Kabak <[email protected]>
AuthorDate: Tue Jul 23 17:11:48 2024 +0300
Chore/fifo tests cleanup (#11616)
* Cleanup fifo tests
* Resolve merge conflicts
---
datafusion/core/src/physical_optimizer/mod.rs | 8 +-
datafusion/core/tests/fifo/mod.rs | 174 +++++++++++++-------------
2 files changed, 90 insertions(+), 92 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/mod.rs
b/datafusion/core/src/physical_optimizer/mod.rs
index a0c9c36977..7c508eeef8 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -33,12 +33,12 @@ pub mod projection_pushdown;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
pub mod sanity_checker;
-mod sort_pushdown;
+#[cfg(test)]
+pub mod test_utils;
pub mod topk_aggregation;
pub mod update_aggr_exprs;
-mod utils;
-#[cfg(test)]
-pub mod test_utils;
+mod sort_pushdown;
+mod utils;
pub use datafusion_physical_optimizer::*;
diff --git a/datafusion/core/tests/fifo/mod.rs
b/datafusion/core/tests/fifo/mod.rs
index 1df97b1636..6efbb9b029 100644
--- a/datafusion/core/tests/fifo/mod.rs
+++ b/datafusion/core/tests/fifo/mod.rs
@@ -6,7 +6,7 @@
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
-//http://www.apache.org/licenses/LICENSE-2.0
+// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
@@ -16,38 +16,37 @@
// under the License.
//! This test demonstrates the DataFusion FIFO capabilities.
-//!
+
#[cfg(target_family = "unix")]
#[cfg(test)]
mod unix_test {
- use datafusion_common::instant::Instant;
- use std::fs::{File, OpenOptions};
- use std::io::Write;
+ use std::fs::File;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
- use std::thread;
use std::time::Duration;
use arrow::array::Array;
use arrow::csv::ReaderBuilder;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::SchemaRef;
- use futures::StreamExt;
- use nix::sys::stat;
- use nix::unistd;
- use tempfile::TempDir;
- use tokio::task::{spawn_blocking, JoinHandle};
-
use datafusion::datasource::stream::{FileStreamProvider, StreamConfig,
StreamTable};
use datafusion::datasource::TableProvider;
use datafusion::{
prelude::{CsvReadOptions, SessionConfig, SessionContext},
test_util::{aggr_test_schema, arrow_test_data},
};
- use datafusion_common::{exec_err, DataFusionError, Result};
+ use datafusion_common::instant::Instant;
+ use datafusion_common::{exec_err, Result};
use datafusion_expr::Expr;
+ use futures::StreamExt;
+ use nix::sys::stat;
+ use nix::unistd;
+ use tempfile::TempDir;
+ use tokio::io::AsyncWriteExt;
+ use tokio::task::{spawn_blocking, JoinHandle};
+
/// Makes a TableProvider for a fifo file
fn fifo_table(
schema: SchemaRef,
@@ -71,8 +70,8 @@ mod unix_test {
}
}
- fn write_to_fifo(
- mut file: &File,
+ async fn write_to_fifo(
+ file: &mut tokio::fs::File,
line: &str,
ref_time: Instant,
broken_pipe_timeout: Duration,
@@ -80,11 +79,11 @@ mod unix_test {
// We need to handle broken pipe error until the reader is ready. This
// is why we use a timeout to limit the wait duration for the reader.
// If the error is different than broken pipe, we fail immediately.
- while let Err(e) = file.write_all(line.as_bytes()) {
+ while let Err(e) = file.write_all(line.as_bytes()).await {
if e.raw_os_error().unwrap() == 32 {
let interval = Instant::now().duration_since(ref_time);
if interval < broken_pipe_timeout {
- thread::sleep(Duration::from_millis(100));
+ tokio::time::sleep(Duration::from_millis(50)).await;
continue;
}
}
@@ -93,28 +92,38 @@ mod unix_test {
Ok(())
}
- fn create_writing_thread(
+ /// This function creates a writing task for the FIFO file. To verify
+ /// incremental processing, it waits for a signal to continue writing after
+ /// a certain number of lines are written.
+ #[allow(clippy::disallowed_methods)]
+ fn create_writing_task(
file_path: PathBuf,
header: String,
lines: Vec<String>,
- waiting_lock: Arc<AtomicBool>,
- wait_until: usize,
+ waiting_signal: Arc<AtomicBool>,
+ send_before_waiting: usize,
) -> JoinHandle<()> {
// Timeout for a long period of BrokenPipe error
let broken_pipe_timeout = Duration::from_secs(10);
- let sa = file_path.clone();
- // Spawn a new thread to write to the FIFO file
- #[allow(clippy::disallowed_methods)] // spawn allowed only in tests
- spawn_blocking(move || {
- let file = OpenOptions::new().write(true).open(sa).unwrap();
+ // Spawn a new task to write to the FIFO file
+ tokio::spawn(async move {
+ let mut file = tokio::fs::OpenOptions::new()
+ .write(true)
+ .open(file_path)
+ .await
+ .unwrap();
// Reference time to use when deciding to fail the test
let execution_start = Instant::now();
- write_to_fifo(&file, &header, execution_start,
broken_pipe_timeout).unwrap();
+ write_to_fifo(&mut file, &header, execution_start,
broken_pipe_timeout)
+ .await
+ .unwrap();
for (cnt, line) in lines.iter().enumerate() {
- while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
- thread::sleep(Duration::from_millis(50));
+ while waiting_signal.load(Ordering::SeqCst) && cnt >
send_before_waiting {
+ tokio::time::sleep(Duration::from_millis(50)).await;
}
- write_to_fifo(&file, line, execution_start,
broken_pipe_timeout).unwrap();
+ write_to_fifo(&mut file, line, execution_start,
broken_pipe_timeout)
+ .await
+ .unwrap();
}
drop(file);
})
@@ -125,6 +134,8 @@ mod unix_test {
const TEST_BATCH_SIZE: usize = 20;
// Number of lines written to FIFO
const TEST_DATA_SIZE: usize = 20_000;
+ // Number of lines to write before waiting to verify incremental processing
+ const SEND_BEFORE_WAITING: usize = 2 * TEST_BATCH_SIZE;
// Number of lines what can be joined. Each joinable key produced 20 lines
with
// aggregate_test_100 dataset. We will use these joinable keys for
understanding
// incremental execution.
@@ -132,7 +143,7 @@ mod unix_test {
// This test provides a relatively realistic end-to-end scenario where
// we swap join sides to accommodate a FIFO source.
- #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
+ #[tokio::test]
async fn unbounded_file_with_swapped_join() -> Result<()> {
// Create session context
let config = SessionConfig::new()
@@ -162,8 +173,8 @@ mod unix_test {
.zip(0..TEST_DATA_SIZE)
.map(|(a1, a2)| format!("{a1},{a2}\n"))
.collect::<Vec<_>>();
- // Create writing threads for the left and right FIFO files
- let task = create_writing_thread(
+ // Create writing tasks for the left and right FIFO files
+ let task = create_writing_task(
fifo_path.clone(),
"a1,a2\n".to_owned(),
lines,
@@ -190,7 +201,16 @@ mod unix_test {
)
.await?;
// Execute the query
- let df = ctx.sql("SELECT t1.a2, t2.c1, t2.c4, t2.c5 FROM left as t1
JOIN right as t2 ON t1.a1 = t2.c1").await?;
+ let df = ctx
+ .sql(
+ "SELECT
+ t1.a2, t2.c1, t2.c4, t2.c5
+ FROM
+ left as t1, right as t2
+ WHERE
+ t1.a1 = t2.c1",
+ )
+ .await?;
let mut stream = df.execute_stream().await?;
while (stream.next().await).is_some() {
waiting.store(false, Ordering::SeqCst);
@@ -199,16 +219,9 @@ mod unix_test {
Ok(())
}
- #[derive(Debug, PartialEq)]
- enum JoinOperation {
- LeftUnmatched,
- RightUnmatched,
- Equal,
- }
-
- // This test provides a relatively realistic end-to-end scenario where
- // we change the join into a [SymmetricHashJoin] to accommodate two
- // unbounded (FIFO) sources.
+ /// This test provides a relatively realistic end-to-end scenario where
+ /// we change the join into a `SymmetricHashJoinExec` to accommodate two
+ /// unbounded (FIFO) sources.
#[tokio::test]
async fn unbounded_file_with_symmetric_join() -> Result<()> {
// Create session context
@@ -247,19 +260,18 @@ mod unix_test {
let df = ctx
.sql(
"SELECT
- t1.a1,
- t1.a2,
- t2.a1,
- t2.a2
+ t1.a1, t1.a2, t2.a1, t2.a2
FROM
- left as t1 FULL
- JOIN right as t2 ON t1.a2 = t2.a2
- AND t1.a1 > t2.a1 + 4
- AND t1.a1 < t2.a1 + 9",
+ left as t1
+ FULL JOIN
+ right as t2
+ ON
+ t1.a2 = t2.a2 AND
+ t1.a1 > t2.a1 + 4 AND
+ t1.a1 < t2.a1 + 9",
)
.await?;
let mut stream = df.execute_stream().await?;
- let mut operations = vec![];
// Tasks
let mut tasks: Vec<JoinHandle<()>> = vec![];
@@ -273,54 +285,43 @@ mod unix_test {
.map(|(a1, a2)| format!("{a1},{a2}\n"))
.collect::<Vec<_>>();
- // Create writing threads for the left and right FIFO files
- tasks.push(create_writing_thread(
+ // Create writing tasks for the left and right FIFO files
+ tasks.push(create_writing_task(
left_fifo,
"a1,a2\n".to_owned(),
lines.clone(),
waiting.clone(),
- TEST_BATCH_SIZE,
+ SEND_BEFORE_WAITING,
));
- tasks.push(create_writing_thread(
+ tasks.push(create_writing_task(
right_fifo,
"a1,a2\n".to_owned(),
- lines.clone(),
+ lines,
waiting.clone(),
- TEST_BATCH_SIZE,
+ SEND_BEFORE_WAITING,
));
- // Partial.
+ // Collect output data:
+ let (mut equal, mut left, mut right) = (0, 0, 0);
while let Some(Ok(batch)) = stream.next().await {
waiting.store(false, Ordering::SeqCst);
let left_unmatched = batch.column(2).null_count();
let right_unmatched = batch.column(0).null_count();
- let op = if left_unmatched == 0 && right_unmatched == 0 {
- JoinOperation::Equal
- } else if right_unmatched > left_unmatched {
- JoinOperation::RightUnmatched
+ if left_unmatched == 0 && right_unmatched == 0 {
+ equal += 1;
+ } else if right_unmatched <= left_unmatched {
+ left += 1;
} else {
- JoinOperation::LeftUnmatched
+ right += 1;
};
- operations.push(op);
}
futures::future::try_join_all(tasks).await.unwrap();
- // The SymmetricHashJoin executor produces FULL join results at every
- // pruning, which happens before it reaches the end of input and more
- // than once. In this test, we feed partially joinable data to both
- // sides in order to ensure that left or right unmatched results are
- // generated more than once during the test.
- assert!(
- operations
- .iter()
- .filter(|&n| JoinOperation::RightUnmatched.eq(n))
- .count()
- > 1
- && operations
- .iter()
- .filter(|&n| JoinOperation::LeftUnmatched.eq(n))
- .count()
- > 1
- );
+ // The symmetric hash join algorithm produces FULL join results at
+ // every pruning, which happens before it reaches the end of input and
+ // more than once. In this test, we feed partially joinable data to
+ // both sides in order to ensure that left or right unmatched results
+ // are generated as expected.
+ assert!(equal >= 0 && left > 1 && right > 1);
Ok(())
}
@@ -341,17 +342,14 @@ mod unix_test {
(source_fifo_path.clone(), source_fifo_path.display());
// Tasks
let mut tasks: Vec<JoinHandle<()>> = vec![];
- // TEST_BATCH_SIZE + 1 rows will be provided. However, after
processing precisely
- // TEST_BATCH_SIZE rows, the program will pause and wait for a batch
to be read in another
- // thread. This approach ensures that the pipeline remains unbroken.
- tasks.push(create_writing_thread(
+ tasks.push(create_writing_task(
source_fifo_path_thread,
"a1,a2\n".to_owned(),
(0..TEST_DATA_SIZE)
.map(|_| "a,1\n".to_string())
.collect::<Vec<_>>(),
waiting,
- TEST_BATCH_SIZE,
+ SEND_BEFORE_WAITING,
));
// Create a new temporary FIFO file
let sink_fifo_path = create_fifo_file(&tmp_dir, "sink.csv")?;
@@ -370,8 +368,8 @@ mod unix_test {
let mut reader = ReaderBuilder::new(schema)
.with_batch_size(TEST_BATCH_SIZE)
+ .with_header(true)
.build(file)
- .map_err(|e| DataFusionError::Internal(e.to_string()))
.unwrap();
while let Some(Ok(_)) = reader.next() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]