This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8216e32e87 [MINOR]: Fix some minor silent bugs (#11127)
8216e32e87 is described below

commit 8216e32e87b2238d8814fe16215c8770d6c327c8
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Jun 27 17:10:31 2024 +0300

    [MINOR]: Fix some minor silent bugs (#11127)
---
 datafusion/core/tests/fifo/mod.rs              | 59 +++++++++++++-------------
 datafusion/core/tests/tpcds_planning.rs        |  5 ++-
 datafusion/physical-expr/src/partitioning.rs   |  4 +-
 datafusion/physical-plan/src/aggregates/mod.rs |  2 +-
 4 files changed, 38 insertions(+), 32 deletions(-)

diff --git a/datafusion/core/tests/fifo/mod.rs 
b/datafusion/core/tests/fifo/mod.rs
index 2e21abffab..1df97b1636 100644
--- a/datafusion/core/tests/fifo/mod.rs
+++ b/datafusion/core/tests/fifo/mod.rs
@@ -217,17 +217,6 @@ mod unix_test {
             .set_bool("datafusion.execution.coalesce_batches", false)
             .with_target_partitions(1);
         let ctx = SessionContext::new_with_config(config);
-        // Tasks
-        let mut tasks: Vec<JoinHandle<()>> = vec![];
-
-        // Join filter
-        let a1_iter = 0..TEST_DATA_SIZE;
-        // Join key
-        let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10);
-        let lines = a1_iter
-            .zip(a2_iter)
-            .map(|(a1, a2)| format!("{a1},{a2}\n"))
-            .collect::<Vec<_>>();
 
         // Create a new temporary FIFO file
         let tmp_dir = TempDir::new()?;
@@ -238,22 +227,6 @@ mod unix_test {
         // Create a mutex for tracking if the right input source is waiting 
for data.
         let waiting = Arc::new(AtomicBool::new(true));
 
-        // Create writing threads for the left and right FIFO files
-        tasks.push(create_writing_thread(
-            left_fifo.clone(),
-            "a1,a2\n".to_owned(),
-            lines.clone(),
-            waiting.clone(),
-            TEST_BATCH_SIZE,
-        ));
-        tasks.push(create_writing_thread(
-            right_fifo.clone(),
-            "a1,a2\n".to_owned(),
-            lines.clone(),
-            waiting.clone(),
-            TEST_BATCH_SIZE,
-        ));
-
         // Create schema
         let schema = Arc::new(Schema::new(vec![
             Field::new("a1", DataType::UInt32, false),
@@ -264,10 +237,10 @@ mod unix_test {
         let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
 
         // Set unbounded sorted files read configuration
-        let provider = fifo_table(schema.clone(), left_fifo, order.clone());
+        let provider = fifo_table(schema.clone(), left_fifo.clone(), 
order.clone());
         ctx.register_table("left", provider)?;
 
-        let provider = fifo_table(schema.clone(), right_fifo, order);
+        let provider = fifo_table(schema.clone(), right_fifo.clone(), order);
         ctx.register_table("right", provider)?;
 
         // Execute the query, with no matching rows. (since key is modulus 10)
@@ -287,6 +260,34 @@ mod unix_test {
             .await?;
         let mut stream = df.execute_stream().await?;
         let mut operations = vec![];
+
+        // Tasks
+        let mut tasks: Vec<JoinHandle<()>> = vec![];
+
+        // Join filter
+        let a1_iter = 0..TEST_DATA_SIZE;
+        // Join key
+        let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10);
+        let lines = a1_iter
+            .zip(a2_iter)
+            .map(|(a1, a2)| format!("{a1},{a2}\n"))
+            .collect::<Vec<_>>();
+
+        // Create writing threads for the left and right FIFO files
+        tasks.push(create_writing_thread(
+            left_fifo,
+            "a1,a2\n".to_owned(),
+            lines.clone(),
+            waiting.clone(),
+            TEST_BATCH_SIZE,
+        ));
+        tasks.push(create_writing_thread(
+            right_fifo,
+            "a1,a2\n".to_owned(),
+            lines.clone(),
+            waiting.clone(),
+            TEST_BATCH_SIZE,
+        ));
         // Partial.
         while let Some(Ok(batch)) = stream.next().await {
             waiting.store(false, Ordering::SeqCst);
diff --git a/datafusion/core/tests/tpcds_planning.rs 
b/datafusion/core/tests/tpcds_planning.rs
index 44fb0afff3..b99bc26800 100644
--- a/datafusion/core/tests/tpcds_planning.rs
+++ b/datafusion/core/tests/tpcds_planning.rs
@@ -1044,7 +1044,10 @@ async fn regression_test(query_no: u8, create_physical: 
bool) -> Result<()> {
     for table in &tables {
         ctx.register_table(
             table.name.as_str(),
-            Arc::new(MemTable::try_new(Arc::new(table.schema.clone()), 
vec![])?),
+            Arc::new(MemTable::try_new(
+                Arc::new(table.schema.clone()),
+                vec![vec![]],
+            )?),
         )?;
     }
 
diff --git a/datafusion/physical-expr/src/partitioning.rs 
b/datafusion/physical-expr/src/partitioning.rs
index fcb3278b60..273c77fb1d 100644
--- a/datafusion/physical-expr/src/partitioning.rs
+++ b/datafusion/physical-expr/src/partitioning.rs
@@ -152,6 +152,8 @@ impl Partitioning {
         match required {
             Distribution::UnspecifiedDistribution => true,
             Distribution::SinglePartition if self.partition_count() == 1 => 
true,
+            // When partition count is 1, hash requirement is satisfied.
+            Distribution::HashPartitioned(_) if self.partition_count() == 1 => 
true,
             Distribution::HashPartitioned(required_exprs) => {
                 match self {
                     // Here we do not check the partition count for hash 
partitioning and assumes the partition count
@@ -290,7 +292,7 @@ mod tests {
                     assert_eq!(result, (true, false, false, false, false))
                 }
                 Distribution::HashPartitioned(_) => {
-                    assert_eq!(result, (false, false, false, true, false))
+                    assert_eq!(result, (true, false, false, true, false))
                 }
             }
         }
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 4c187f03f3..533d10357b 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -675,7 +675,7 @@ impl ExecutionPlan for AggregateExec {
                 vec![Distribution::UnspecifiedDistribution]
             }
             AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned 
=> {
-                vec![Distribution::HashPartitioned(self.output_group_expr())]
+                
vec![Distribution::HashPartitioned(self.group_by.input_exprs())]
             }
             AggregateMode::Final | AggregateMode::Single => {
                 vec![Distribution::SinglePartition]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to