This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 205e315ed3 fix: RANGE frame for corner cases with empty ORDER BY 
clause should be treated as constant sort (#8445)
205e315ed3 is described below

commit 205e315ed3eafbb016ffc5ac62a3be07734a8885
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Fri Dec 8 01:37:03 2023 -0800

    fix: RANGE frame for corner cases with empty ORDER BY clause should be 
treated as constant sort (#8445)
    
    * fix: RANGE frame for corner cases with empty ORDER BY clause should be 
treated as constant sort
    
    * fix
    
    * Make the test not flaky
    
    * fix clippy
---
 datafusion/expr/src/window_frame.rs             | 56 ++++++++++++++++---------
 datafusion/proto/src/logical_plan/from_proto.rs |  8 ++--
 datafusion/sql/src/expr/function.rs             | 10 +++--
 datafusion/sqllogictest/test_files/window.slt   | 16 ++++---
 4 files changed, 56 insertions(+), 34 deletions(-)

diff --git a/datafusion/expr/src/window_frame.rs 
b/datafusion/expr/src/window_frame.rs
index 2a64f21b85..2701ca1ecf 100644
--- a/datafusion/expr/src/window_frame.rs
+++ b/datafusion/expr/src/window_frame.rs
@@ -23,6 +23,8 @@
 //! - An ending frame boundary,
 //! - An EXCLUDE clause.
 
+use crate::expr::Sort;
+use crate::Expr;
 use datafusion_common::{plan_err, sql_err, DataFusionError, Result, 
ScalarValue};
 use sqlparser::ast;
 use sqlparser::parser::ParserError::ParserError;
@@ -142,41 +144,57 @@ impl WindowFrame {
     }
 }
 
-/// Construct equivalent explicit window frames for implicit corner cases.
-/// With this processing, we may assume in downstream code that RANGE/GROUPS
-/// frames contain an appropriate ORDER BY clause.
-pub fn regularize(mut frame: WindowFrame, order_bys: usize) -> 
Result<WindowFrame> {
-    if frame.units == WindowFrameUnits::Range && order_bys != 1 {
+/// Regularizes ORDER BY clause for window definition for implicit corner 
cases.
+pub fn regularize_window_order_by(
+    frame: &WindowFrame,
+    order_by: &mut Vec<Expr>,
+) -> Result<()> {
+    if frame.units == WindowFrameUnits::Range && order_by.len() != 1 {
         // Normally, RANGE frames require an ORDER BY clause with exactly one
         // column. However, an ORDER BY clause may be absent or present but 
with
         // more than one column in two edge cases:
         // 1. start bound is UNBOUNDED or CURRENT ROW
         // 2. end bound is CURRENT ROW or UNBOUNDED.
-        // In these cases, we regularize the RANGE frame to be equivalent to a 
ROWS
-        // frame with the UNBOUNDED bounds.
-        // Note that this follows Postgres behavior.
+        // In these cases, we regularize the ORDER BY clause if the ORDER BY 
clause
+        // is absent. If an ORDER BY clause is present but has more than one 
column,
+        // the ORDER BY clause is unchanged. Note that this follows Postgres 
behavior.
         if (frame.start_bound.is_unbounded()
             || frame.start_bound == WindowFrameBound::CurrentRow)
             && (frame.end_bound == WindowFrameBound::CurrentRow
                 || frame.end_bound.is_unbounded())
         {
-            // If an ORDER BY clause is absent, the frame is equivalent to a 
ROWS
-            // frame with the UNBOUNDED bounds.
-            // If an ORDER BY clause is present but has more than one column, 
the
-            // frame is unchanged.
-            if order_bys == 0 {
-                frame.units = WindowFrameUnits::Rows;
-                frame.start_bound =
-                    WindowFrameBound::Preceding(ScalarValue::UInt64(None));
-                frame.end_bound = 
WindowFrameBound::Following(ScalarValue::UInt64(None));
+            // If an ORDER BY clause is absent, it is equivalent to a ORDER BY 
clause
+            // with constant value as sort key.
+            // If an ORDER BY clause is present but has more than one column, 
it is
+            // unchanged.
+            if order_by.is_empty() {
+                order_by.push(Expr::Sort(Sort::new(
+                    Box::new(Expr::Literal(ScalarValue::UInt64(Some(1)))),
+                    true,
+                    false,
+                )));
             }
-        } else {
+        }
+    }
+    Ok(())
+}
+
+/// Checks if given window frame is valid. In particular, if the frame is RANGE
+/// with offset PRECEDING/FOLLOWING, it must have exactly one ORDER BY column.
+pub fn check_window_frame(frame: &WindowFrame, order_bys: usize) -> Result<()> 
{
+    if frame.units == WindowFrameUnits::Range && order_bys != 1 {
+        // See `regularize_window_order_by`.
+        if !(frame.start_bound.is_unbounded()
+            || frame.start_bound == WindowFrameBound::CurrentRow)
+            || !(frame.end_bound == WindowFrameBound::CurrentRow
+                || frame.end_bound.is_unbounded())
+        {
             plan_err!("RANGE requires exactly one ORDER BY column")?
         }
     } else if frame.units == WindowFrameUnits::Groups && order_bys == 0 {
         plan_err!("GROUPS requires an ORDER BY clause")?
     };
-    Ok(frame)
+    Ok(())
 }
 
 /// There are five ways to describe starting and ending frame boundaries:
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 13576aaa08..22a3ed804a 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -39,6 +39,7 @@ use datafusion_common::{
     internal_err, plan_datafusion_err, Column, Constraint, Constraints, 
DFField,
     DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, 
ScalarValue,
 };
+use datafusion_expr::window_frame::{check_window_frame, 
regularize_window_order_by};
 use datafusion_expr::{
     abs, acos, acosh, array, array_append, array_concat, array_dims, 
array_element,
     array_except, array_has, array_has_all, array_has_any, array_intersect, 
array_length,
@@ -59,7 +60,6 @@ use datafusion_expr::{
     sqrt, starts_with, string_to_array, strpos, struct_fun, substr, 
substr_index,
     substring, tan, tanh, to_hex, to_timestamp_micros, to_timestamp_millis,
     to_timestamp_nanos, to_timestamp_seconds, translate, trim, trunc, upper, 
uuid,
-    window_frame::regularize,
     AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, 
BuiltinScalarFunction,
     Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet,
     GroupingSet::GroupingSets,
@@ -1072,7 +1072,7 @@ pub fn parse_expr(
                 .iter()
                 .map(|e| parse_expr(e, registry))
                 .collect::<Result<Vec<_>, _>>()?;
-            let order_by = expr
+            let mut order_by = expr
                 .order_by
                 .iter()
                 .map(|e| parse_expr(e, registry))
@@ -1082,7 +1082,8 @@ pub fn parse_expr(
                 .as_ref()
                 .map::<Result<WindowFrame, _>, _>(|window_frame| {
                     let window_frame = window_frame.clone().try_into()?;
-                    regularize(window_frame, order_by.len())
+                    check_window_frame(&window_frame, order_by.len())
+                        .map(|_| window_frame)
                 })
                 .transpose()?
                 .ok_or_else(|| {
@@ -1090,6 +1091,7 @@ pub fn parse_expr(
                         "missing window frame during 
deserialization".to_string(),
                     )
                 })?;
+            regularize_window_order_by(&window_frame, &mut order_by)?;
 
             match window_function {
                 window_expr_node::WindowFunction::AggrFunction(i) => {
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index 14ea20c3fa..73de4fa439 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -21,7 +21,7 @@ use datafusion_common::{
 };
 use datafusion_expr::expr::ScalarFunction;
 use datafusion_expr::function::suggest_valid_function;
-use datafusion_expr::window_frame::regularize;
+use datafusion_expr::window_frame::{check_window_frame, 
regularize_window_order_by};
 use datafusion_expr::{
     expr, window_function, AggregateFunction, BuiltinScalarFunction, Expr, 
WindowFrame,
     WindowFunction,
@@ -92,7 +92,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .into_iter()
                 .map(|e| self.sql_expr_to_logical_expr(e, schema, 
planner_context))
                 .collect::<Result<Vec<_>>>()?;
-            let order_by = self.order_by_to_sort_expr(
+            let mut order_by = self.order_by_to_sort_expr(
                 &window.order_by,
                 schema,
                 planner_context,
@@ -104,14 +104,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 .as_ref()
                 .map(|window_frame| {
                     let window_frame = window_frame.clone().try_into()?;
-                    regularize(window_frame, order_by.len())
+                    check_window_frame(&window_frame, order_by.len())
+                        .map(|_| window_frame)
                 })
                 .transpose()?;
+
             let window_frame = if let Some(window_frame) = window_frame {
+                regularize_window_order_by(&window_frame, &mut order_by)?;
                 window_frame
             } else {
                 WindowFrame::new(!order_by.is_empty())
             };
+
             if let Ok(fun) = self.find_window_func(&name) {
                 let expr = match fun {
                     WindowFunction::AggregateFunction(aggregate_fun) => {
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 5b69ead0ff..b660a9a0c2 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3763,15 +3763,13 @@ select a,
 1 1
 2 2
 
-# TODO: this is different to Postgres which returns [1, 1] for `rnk`.
-# Comment it because it is flaky now as it depends on the order of the `a` 
column.
-# query II
-# select a,
-#       rank() over (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING) rnk
-#       from (select 1 a union select 2 a) q ORDER BY rnk
-# ----
-# 1 1
-# 2 2
+query II
+select a,
+      rank() over (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) 
rnk
+      from (select 1 a union select 2 a) q ORDER BY a
+----
+1 1
+2 1
 
 # TODO: this works in Postgres which returns [1, 1].
 query error DataFusion error: Arrow error: Invalid argument error: must either 
specify a row count or at least one column

Reply via email to