This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 92a471e55f Fix undeterministic behaviour of schema nullability of lag 
window query (#9508)
92a471e55f is described below

commit 92a471e55faf9077e1bbb1b8cfad52b9ff89faba
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat Mar 9 00:32:34 2024 +0300

    Fix undeterministic behaviour of schema nullability of lag window query 
(#9508)
    
    * Initial commit
    
    * Update comment, remove leftovers
    
    * Add test
    
    * Update datafusion/sqllogictest/test_files/window.slt
    
    Co-authored-by: comphead <[email protected]>
    
    ---------
    
    Co-authored-by: comphead <[email protected]>
---
 .../src/windows/bounded_window_agg_exec.rs         | 31 +++++++++++++++++-----
 datafusion/sqllogictest/test_files/window.slt      | 10 +++++++
 2 files changed, 35 insertions(+), 6 deletions(-)

diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index c99ec59959..4cba571054 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -163,6 +163,7 @@ impl BoundedWindowAggExec {
     fn get_search_algo(&self) -> Result<Box<dyn PartitionSearcher>> {
         let partition_by_sort_keys = self.partition_by_sort_keys()?;
         let ordered_partition_by_indices = 
self.ordered_partition_by_indices.clone();
+        let input_schema = self.input().schema();
         Ok(match &self.input_order_mode {
             InputOrderMode::Sorted => {
                 // In Sorted mode, all partition by columns should be ordered.
@@ -174,11 +175,12 @@ impl BoundedWindowAggExec {
                 Box::new(SortedSearch {
                     partition_by_sort_keys,
                     ordered_partition_by_indices,
+                    input_schema,
                 })
             }
-            InputOrderMode::Linear | InputOrderMode::PartiallySorted(_) => {
-                Box::new(LinearSearch::new(ordered_partition_by_indices))
-            }
+            InputOrderMode::Linear | InputOrderMode::PartiallySorted(_) => 
Box::new(
+                LinearSearch::new(ordered_partition_by_indices, input_schema),
+            ),
         })
     }
 
@@ -378,12 +380,16 @@ trait PartitionSearcher: Send {
                 let partition_batch_state = partition_buffers
                     .entry(partition_row)
                     .or_insert_with(|| PartitionBatchState {
-                        record_batch: 
RecordBatch::new_empty(partition_batch.schema()),
+                        // Use input_schema, for the buffer schema.
+                        // record_batch.schema may not have necessary schema, 
in terms of
+                        // nullability constraints of the output.
+                        // See issue: 
https://github.com/apache/arrow-datafusion/issues/9320
+                        record_batch: 
RecordBatch::new_empty(self.input_schema().clone()),
                         is_end: false,
                         n_out_row: 0,
                     });
                 partition_batch_state.record_batch = concat_batches(
-                    &partition_batch.schema(),
+                    self.input_schema(),
                     [&partition_batch_state.record_batch, &partition_batch],
                 )?;
             }
@@ -398,6 +404,8 @@ trait PartitionSearcher: Send {
 
         Ok(())
     }
+
+    fn input_schema(&self) -> &SchemaRef;
 }
 
 /// This object encapsulates the algorithm state for a simple linear scan
@@ -423,6 +431,7 @@ pub struct LinearSearch {
     /// The third entry stores how many new outputs are calculated for the
     /// corresponding partition.
     row_map_out: RawTable<(u64, usize, usize)>,
+    input_schema: SchemaRef,
 }
 
 impl PartitionSearcher for LinearSearch {
@@ -561,17 +570,22 @@ impl PartitionSearcher for LinearSearch {
             }
         }
     }
+
+    fn input_schema(&self) -> &SchemaRef {
+        &self.input_schema
+    }
 }
 
 impl LinearSearch {
     /// Initialize a new [`LinearSearch`] partition searcher.
-    fn new(ordered_partition_by_indices: Vec<usize>) -> Self {
+    fn new(ordered_partition_by_indices: Vec<usize>, input_schema: SchemaRef) 
-> Self {
         LinearSearch {
             input_buffer_hashes: VecDeque::new(),
             random_state: Default::default(),
             ordered_partition_by_indices,
             row_map_batch: RawTable::with_capacity(256),
             row_map_out: RawTable::with_capacity(256),
+            input_schema,
         }
     }
 
@@ -693,6 +707,7 @@ pub struct SortedSearch {
     /// is ordered by a, b and the window expression contains a PARTITION BY 
b, a
     /// clause, this attribute stores [1, 0].
     ordered_partition_by_indices: Vec<usize>,
+    input_schema: SchemaRef,
 }
 
 impl PartitionSearcher for SortedSearch {
@@ -758,6 +773,10 @@ impl PartitionSearcher for SortedSearch {
             partition_batch_state.is_end |= idx < n_partitions - 1;
         }
     }
+
+    fn input_schema(&self) -> &SchemaRef {
+        &self.input_schema
+    }
 }
 
 impl SortedSearch {
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 92d2208029..c7241cae30 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -4269,3 +4269,13 @@ LIMIT 5;
 3 53
 24 31
 14 94
+
+# Tests schema and data are in sync for mixed nulls and not nulls values for 
builtin window function
+query T rowsort
+select lag(a) over () as x1
+        from
+        (select 2 id, 'b' a union all select 1 id, null a union all select 3 
id, null);
+----
+NULL
+NULL
+b

Reply via email to