alamb commented on code in PR #3570:
URL: https://github.com/apache/arrow-datafusion/pull/3570#discussion_r978642644


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -471,3 +471,297 @@ async fn window_with_agg_in_expression() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn window_frame_empty() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c3) OVER(),\
+               COUNT(*) OVER ()\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c3) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_rows_preceding() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c4) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\
+               COUNT(*) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| 4580                       | 2               |",
+        "| -21316                     | 3               |",
+        "| 2427                       | 3               |",
+        "| -27139                     | 3               |",
+        "| 17223                      | 3               |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+#[tokio::test]
+async fn window_frame_rows_preceding_with_partition_unique_order_by() -> 
Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING),\
+               COUNT(*) OVER(PARTITION BY c2 ORDER BY c9 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING)\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| -38611                     | 2               |",
+        "| 17547                      | 2               |",
+        "| -1301                      | 2               |",
+        "| 26638                      | 3               |",
+        "| 26861                      | 3               |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+/// The partition by clause conducts sorting according to given partition 
column by default. If the
+/// sorting columns have non unique values, the unstable sorting may produce 
indeterminate results.
+/// Therefore, we are commenting out the following test for now.

Review Comment:
   One way to handle this type of non determinism would be add a where clause 
that filtered out any non unique values



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -471,3 +471,297 @@ async fn window_with_agg_in_expression() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn window_frame_empty() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c3) OVER(),\
+               COUNT(*) OVER ()\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c3) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_rows_preceding() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c4) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\

Review Comment:
   I am surprised this query has deterministic results -- I would expect the 
OVER clause needs an ORDER BY a well (otherwise the values that c4 will be 
computed could be different.
   
   Something like:
   
   ```suggestion
                  SUM(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING),\
   ```
   (and the same for the OVER below as well)



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -471,3 +471,297 @@ async fn window_with_agg_in_expression() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn window_frame_empty() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c3) OVER(),\
+               COUNT(*) OVER ()\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c3) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_rows_preceding() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c4) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\
+               COUNT(*) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| 4580                       | 2               |",
+        "| -21316                     | 3               |",
+        "| 2427                       | 3               |",
+        "| -27139                     | 3               |",
+        "| 17223                      | 3               |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+#[tokio::test]
+async fn window_frame_rows_preceding_with_partition_unique_order_by() -> 
Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING),\
+               COUNT(*) OVER(PARTITION BY c2 ORDER BY c9 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING)\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| -38611                     | 2               |",
+        "| 17547                      | 2               |",
+        "| -1301                      | 2               |",
+        "| 26638                      | 3               |",
+        "| 26861                      | 3               |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+/// The partition by clause conducts sorting according to given partition 
column by default. If the
+/// sorting columns have non unique values, the unstable sorting may produce 
indeterminate results.
+/// Therefore, we are commenting out the following test for now.
+
+// #[tokio::test]
+// async fn window_frame_rows_preceding_with_non_unique_partition() -> 
Result<()> {
+//     let ctx = SessionContext::new();
+//     register_aggregate_csv(&ctx).await?;
+//     let sql = "SELECT \
+//                SUM(c4) OVER(PARTITION BY c1 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING),\
+//                COUNT(*) OVER(PARTITION BY c2 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING)\
+//                FROM aggregate_test_100 \
+//                ORDER BY c9 \
+//                LIMIT 5";
+//     let actual = execute_to_batches(&ctx, sql).await;
+//     let expected = vec![
+//         "+----------------------------+-----------------+",
+//         "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |",
+//         "+----------------------------+-----------------+",
+//         "| -33822                     | 3               |",
+//         "| 20808                      | 3               |",
+//         "| -29881                     | 3               |",
+//         "| -47613                     | 3               |",
+//         "| -13474                     | 3               |",
+//         "+----------------------------+-----------------+",
+//     ];
+//     assert_batches_eq!(expected, &actual);
+//     Ok(())
+// }
+
+#[tokio::test]
+async fn window_frame_rows_preceding_with_unique_partition() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c4) OVER(PARTITION BY c9 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING),\

Review Comment:
   Given that c9 is unique, all the window sizes here will be `1` so its 
coverage of the preceding and following is somewhat limited
   
   ```
   ❯ select count(c9), count(distinct c9) from aggregate_test_100;
   +------------------------------+---------------------------------------+
   | COUNT(aggregate_test_100.c9) | COUNT(DISTINCT aggregate_test_100.c9) |
   +------------------------------+---------------------------------------+
   | 100                          | 100                                   |
   +------------------------------+---------------------------------------+
   1 row in set. Query took 0.004 seconds.
   ```
   
   



##########
datafusion/physical-expr/src/aggregate/covariance.rs:
##########
@@ -285,6 +285,40 @@ impl Accumulator for CovarianceAccumulator {
         Ok(())
     }
 
+    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {

Review Comment:
   I haven't carefully reviewed this code



##########
datafusion/common/src/bisect.rs:
##########
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides the bisect function, which implements binary search.
+
+use crate::{DataFusionError, Result, ScalarValue};
+use arrow::array::ArrayRef;
+
+/// this function implements bisct_left and bisect_right since these functions

Review Comment:
   I recommend using `///` for comments that explain what the code does and 
then `//` for rationale . The difference is that `///` end up in doc comments 
(e.g. https://docs.rs/datafusion/latest/datafusion/) 



##########
datafusion/expr/src/accumulator.rs:
##########
@@ -38,6 +38,14 @@ pub trait Accumulator: Send + Sync + Debug {
     /// updates the accumulator's state from a vector of arrays.
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
 
+    /// updates the accumulator's state from a vector of arrays.

Review Comment:
   Could we expand a bit on what retract means in this case? Specifically, I 
think it means "make the internal state such that calling 
`update_batch(_values)` would restore the current state
   
   I think it would also help to explain this is used to calculate aggregates 
across moving windows (that this function is called if/when rows are removed 
from the window)



##########
datafusion/physical-expr/src/aggregate/correlation.rs:
##########
@@ -151,6 +151,13 @@ impl Accumulator for CorrelationAccumulator {
         Ok(())
     }
 
+    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        self.covar.retract_batch(values)?;
+        self.stddev1.retract_batch(&[values[0].clone()])?;

Review Comment:
   I think you can avoid these clones with something like
   
   ```rust
           self.stddev1.retract_batch(&values[0..1])?;
           self.stddev2.retract_batch(&values[1..2])?;
   ```
   
   (aka slice the slice!)



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -471,3 +471,297 @@ async fn window_with_agg_in_expression() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn window_frame_empty() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c3) OVER(),\
+               COUNT(*) OVER ()\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c3) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "| 781                        | 100             |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_rows_preceding() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c4) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\
+               COUNT(*) OVER(ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| 4580                       | 2               |",
+        "| -21316                     | 3               |",
+        "| 2427                       | 3               |",
+        "| -27139                     | 3               |",
+        "| 17223                      | 3               |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+#[tokio::test]
+async fn window_frame_rows_preceding_with_partition_unique_order_by() -> 
Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING),\
+               COUNT(*) OVER(PARTITION BY c2 ORDER BY c9 ROWS BETWEEN 1 
PRECEDING AND 1 FOLLOWING)\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| -38611                     | 2               |",
+        "| 17547                      | 2               |",
+        "| -1301                      | 2               |",
+        "| 26638                      | 3               |",
+        "| 26861                      | 3               |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+/// The partition by clause conducts sorting according to given partition 
column by default. If the
+/// sorting columns have non unique values, the unstable sorting may produce 
indeterminate results.
+/// Therefore, we are commenting out the following test for now.
+
+// #[tokio::test]
+// async fn window_frame_rows_preceding_with_non_unique_partition() -> 
Result<()> {
+//     let ctx = SessionContext::new();
+//     register_aggregate_csv(&ctx).await?;
+//     let sql = "SELECT \
+//                SUM(c4) OVER(PARTITION BY c1 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING),\
+//                COUNT(*) OVER(PARTITION BY c2 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING)\
+//                FROM aggregate_test_100 \
+//                ORDER BY c9 \
+//                LIMIT 5";
+//     let actual = execute_to_batches(&ctx, sql).await;
+//     let expected = vec![
+//         "+----------------------------+-----------------+",
+//         "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |",
+//         "+----------------------------+-----------------+",
+//         "| -33822                     | 3               |",
+//         "| 20808                      | 3               |",
+//         "| -29881                     | 3               |",
+//         "| -47613                     | 3               |",
+//         "| -13474                     | 3               |",
+//         "+----------------------------+-----------------+",
+//     ];
+//     assert_batches_eq!(expected, &actual);
+//     Ok(())
+// }
+
+#[tokio::test]
+async fn window_frame_rows_preceding_with_unique_partition() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c4) OVER(PARTITION BY c9 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING),\
+               COUNT(*) OVER(PARTITION BY c9 ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING)\
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |",
+        "+----------------------------+-----------------+",
+        "| -16110                     | 1               |",
+        "| 3917                       | 1               |",
+        "| -16974                     | 1               |",
+        "| -1114                      | 1               |",
+        "| 15673                      | 1               |",
+        "+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_ranges_preceding_following() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT \
+               SUM(c4) OVER(ORDER BY c2 RANGE BETWEEN 1 PRECEDING AND 1 
FOLLOWING),\
+               SUM(c3) OVER(ORDER BY c2 RANGE BETWEEN 10000 PRECEDING AND 
10000 FOLLOWING),\
+               COUNT(*) OVER(ORDER BY c2 RANGE BETWEEN 1 PRECEDING AND 1 
FOLLOWING) \
+               FROM aggregate_test_100 \
+               ORDER BY c9 \
+               LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        
"+----------------------------+----------------------------+-----------------+",

Review Comment:
   I wonder if you ran the same queries in postgres? If not, no worries, but I 
am having a hard time validating these answers are correct (mostly because the 
`register_aggregate_csv` isn't a great dataset lol)



##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -1419,12 +1419,6 @@ pub fn create_window_expr_with_name(
                     )),
                 })
                 .collect::<Result<Vec<_>>>()?;
-            if window_frame.is_some() {

Review Comment:
   ❤️ 



##########
datafusion/expr/src/window_frame.rs:
##########
@@ -90,10 +90,7 @@ impl TryFrom<ast::WindowFrame> for WindowFrame {
                         | WindowFrameBound::Following(Some(v))
                             if *v > 0 =>
                         {
-                            Err(DataFusionError::NotImplemented(format!(
-                                "With WindowFrameUnits={}, the bound cannot be 
{} PRECEDING or FOLLOWING at the moment",
-                                units, v
-                            )))
+                            Ok::<(), DataFusionError>(())
                         }
                         _ => Ok(()),

Review Comment:
   This code now always returns OK, right? it seems like the check on `bound` 
can be removed entirely, though perhaps I  am missing something. 
   
   
   ```rust
           } else {
               let units = value.units.into();
               Ok(Self {
                   units,
                   start_bound,
                   end_bound,
               })
           }
   ```
   
   



##########
datafusion/common/src/bisect.rs:
##########
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides the bisect function, which implements binary search.
+
+use crate::{DataFusionError, Result, ScalarValue};
+use arrow::array::ArrayRef;
+
+/// this function implements bisct_left and bisect_right since these functions
+/// are a lot of code in common we have decided to implement with single 
function where
+/// we separate left and right with compile time lookup.
+/// To use bisect_left give true in the template argument
+/// To use bisect_right give false in the template argument
+pub fn bisect<const SIDE: bool>(

Review Comment:
   While reading this I can't help but wonder if the same logic could be found 
using https://doc.rust-lang.org/std/primitive.slice.html#method.binary_search
   
   Though the subtlety of `left` and `right` would need some finagling



##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -217,141 +205,6 @@ macro_rules! sum_row {
     }};
 }
 
-// TODO implement this in arrow-rs with simd

Review Comment:
   It is a long standing tension between adding more functionality to 
`ScalarValue`
   
   Basically, the theory is that doing any sort of arithmetic on more than a 
few `ScalarValue` is likely to have terrible performance compared to using the 
optimized / vectorized kernels from arrow. So the thinking goes if we make it 
easier to write code that operates directly on ScalarValue we will end up with 
some very bad performance 
   
   However, this is at least the second PR (the first one is adding variance in 
the first place, see https://github.com/apache/arrow-datafusion/pull/1525 -- 
https://github.com/apache/arrow-datafusion/pull/1525#pullrequestreview-847147545
 from @realno ) I have seen that has found this sum code and moved it into 
`ScalarValue` and so my thinking has evolved on this matter.
   
   I now think we should add basic support in ScalarValue for arithmetic for 
initial implementation and then we can work on optimizing performance as follow 
on Prs



##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -511,28 +374,6 @@ mod tests {
 
     #[test]
     fn sum_decimal() -> Result<()> {
-        // test sum

Review Comment:
   Can you explain why these tests were moved?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to