This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 89def2c6e5 Convert `bool_and` & `bool_or` to UDAF (#11009)
89def2c6e5 is described below

commit 89def2c6e5f32f15dd80dd19d11b3087e1647310
Author: jcsherin <[email protected]>
AuthorDate: Thu Jun 20 16:29:59 2024 +0530

    Convert `bool_and` & `bool_or` to UDAF (#11009)
    
    * Port `bool_and` and `bool_or` to `AggregateUDFImpl`
    
    * Remove trait methods with default implementation
    
    * Add `bool_or_udaf`
    
    * Register `bool_and` and `bool_or`
    
    * Remove from `physical-expr`
    
    * Add expressions to logical plan roundtrip test
    
    * minor: remove methods with default implementation
    
    * Removes redundant tests
    
    * Removes hard-coded function names
---
 datafusion/expr/src/aggregate_function.rs          |  15 -
 datafusion/expr/src/type_coercion/aggregates.rs    |  16 -
 datafusion/functions-aggregate/src/bool_and_or.rs  | 343 ++++++++++++++++++
 datafusion/functions-aggregate/src/lib.rs          |   6 +-
 .../physical-expr/src/aggregate/bool_and_or.rs     | 394 ---------------------
 datafusion/physical-expr/src/aggregate/build_in.rs |  56 +--
 .../src/aggregate/groups_accumulator/mod.rs        |   3 -
 datafusion/physical-expr/src/aggregate/mod.rs      |   1 -
 datafusion/physical-expr/src/expressions/mod.rs    |   1 -
 datafusion/proto/proto/datafusion.proto            |   4 +-
 datafusion/proto/src/generated/pbjson.rs           |   6 -
 datafusion/proto/src/generated/prost.rs            |   8 +-
 datafusion/proto/src/logical_plan/from_proto.rs    |   2 -
 datafusion/proto/src/logical_plan/to_proto.rs      |   4 -
 datafusion/proto/src/physical_plan/to_proto.rs     |  12 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   6 +-
 16 files changed, 362 insertions(+), 515 deletions(-)

diff --git a/datafusion/expr/src/aggregate_function.rs 
b/datafusion/expr/src/aggregate_function.rs
index 1cde1c5050..967ccc0b08 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -47,10 +47,6 @@ pub enum AggregateFunction {
     Correlation,
     /// Grouping
     Grouping,
-    /// Bool And
-    BoolAnd,
-    /// Bool Or
-    BoolOr,
 }
 
 impl AggregateFunction {
@@ -64,8 +60,6 @@ impl AggregateFunction {
             NthValue => "NTH_VALUE",
             Correlation => "CORR",
             Grouping => "GROUPING",
-            BoolAnd => "BOOL_AND",
-            BoolOr => "BOOL_OR",
         }
     }
 }
@@ -82,8 +76,6 @@ impl FromStr for AggregateFunction {
         Ok(match name {
             // general
             "avg" => AggregateFunction::Avg,
-            "bool_and" => AggregateFunction::BoolAnd,
-            "bool_or" => AggregateFunction::BoolOr,
             "max" => AggregateFunction::Max,
             "mean" => AggregateFunction::Avg,
             "min" => AggregateFunction::Min,
@@ -128,9 +120,6 @@ impl AggregateFunction {
                 // The coerced_data_types is same with input_types.
                 Ok(coerced_data_types[0].clone())
             }
-            AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
-                Ok(DataType::Boolean)
-            }
             AggregateFunction::Correlation => {
                 correlation_return_type(&coerced_data_types[0])
             }
@@ -179,10 +168,6 @@ impl AggregateFunction {
                     .collect::<Vec<_>>();
                 Signature::uniform(1, valid, Volatility::Immutable)
             }
-            AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
-                Signature::uniform(1, vec![DataType::Boolean], 
Volatility::Immutable)
-            }
-
             AggregateFunction::Avg => {
                 Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
             }
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs 
b/datafusion/expr/src/type_coercion/aggregates.rs
index abe6d8b182..428fc99070 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -121,18 +121,6 @@ pub fn coerce_types(
             };
             Ok(vec![v])
         }
-        AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
-            // Refer to 
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
-            // smallint, int, bigint, real, double precision, decimal, or 
interval.
-            if !is_bool_and_or_support_arg_type(&input_types[0]) {
-                return plan_err!(
-                    "The function {:?} does not support inputs of type {:?}.",
-                    agg_fun,
-                    input_types[0]
-                );
-            }
-            Ok(input_types.to_vec())
-        }
         AggregateFunction::Correlation => {
             if !is_correlation_support_arg_type(&input_types[0]) {
                 return plan_err!(
@@ -319,10 +307,6 @@ pub fn avg_sum_type(arg_type: &DataType) -> 
Result<DataType> {
     }
 }
 
-pub fn is_bool_and_or_support_arg_type(arg_type: &DataType) -> bool {
-    matches!(arg_type, DataType::Boolean)
-}
-
 pub fn is_sum_support_arg_type(arg_type: &DataType) -> bool {
     match arg_type {
         DataType::Dictionary(_, dict_value_type) => {
diff --git a/datafusion/functions-aggregate/src/bool_and_or.rs 
b/datafusion/functions-aggregate/src/bool_and_or.rs
new file mode 100644
index 0000000000..d002867274
--- /dev/null
+++ b/datafusion/functions-aggregate/src/bool_and_or.rs
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use std::any::Any;
+
+use arrow::array::ArrayRef;
+use arrow::array::BooleanArray;
+use arrow::compute::bool_and as compute_bool_and;
+use arrow::compute::bool_or as compute_bool_or;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use datafusion_common::internal_err;
+use datafusion_common::{downcast_value, not_impl_err};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity};
+use datafusion_expr::{
+    Accumulator, AggregateUDFImpl, GroupsAccumulator, ReversedUDAF, Signature, 
Volatility,
+};
+
+use 
datafusion_physical_expr_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator;
+
+// returns the new value after bool_and/bool_or with the new values, taking 
nullability into account
+macro_rules! typed_bool_and_or_batch {
+    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+        let array = downcast_value!($VALUES, $ARRAYTYPE);
+        let delta = $OP(array);
+        Ok(ScalarValue::$SCALAR(delta))
+    }};
+}
+
+// bool_and/bool_or the array and returns a ScalarValue of its corresponding 
type.
+macro_rules! bool_and_or_batch {
+    ($VALUES:expr, $OP:ident) => {{
+        match $VALUES.data_type() {
+            DataType::Boolean => {
+                typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
+            }
+            e => {
+                return internal_err!(
+                    "Bool and/Bool or is not expected to receive the type 
{e:?}"
+                );
+            }
+        }
+    }};
+}
+
+/// dynamically-typed bool_and(array) -> ScalarValue
+fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, compute_bool_and)
+}
+
+/// dynamically-typed bool_or(array) -> ScalarValue
+fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+    bool_and_or_batch!(values, compute_bool_or)
+}
+
+make_udaf_expr_and_func!(
+    BoolAnd,
+    bool_and,
+    expression,
+    "The values to combine with `AND`",
+    bool_and_udaf
+);
+
+make_udaf_expr_and_func!(
+    BoolOr,
+    bool_or,
+    expression,
+    "The values to combine with `OR`",
+    bool_or_udaf
+);
+
+/// BOOL_AND aggregate expression
+#[derive(Debug)]
+pub struct BoolAnd {
+    signature: Signature,
+}
+
+impl BoolAnd {
+    fn new() -> Self {
+        Self {
+            signature: Signature::uniform(
+                1,
+                vec![DataType::Boolean],
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+
+impl Default for BoolAnd {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl AggregateUDFImpl for BoolAnd {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "bool_and"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn accumulator(&self, _: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::<BoolAndAccumulator>::default())
+    }
+
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(args.name, self.name()),
+            DataType::Boolean,
+            true,
+        )])
+    }
+
+    fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
+        true
+    }
+
+    fn create_groups_accumulator(
+        &self,
+        args: AccumulatorArgs,
+    ) -> Result<Box<dyn GroupsAccumulator>> {
+        match args.data_type {
+            DataType::Boolean => {
+                Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y)))
+            }
+            _ => not_impl_err!(
+                "GroupsAccumulator not supported for {} with {}",
+                args.name,
+                args.data_type
+            ),
+        }
+    }
+
+    fn aliases(&self) -> &[String] {
+        &[]
+    }
+
+    fn create_sliding_accumulator(
+        &self,
+        _: AccumulatorArgs,
+    ) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::<BoolAndAccumulator>::default())
+    }
+
+    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+        AggregateOrderSensitivity::Insensitive
+    }
+
+    fn reverse_expr(&self) -> ReversedUDAF {
+        ReversedUDAF::Identical
+    }
+}
+
+#[derive(Debug, Default)]
+struct BoolAndAccumulator {
+    acc: Option<bool>,
+}
+
+impl Accumulator for BoolAndAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        self.acc = match (self.acc, bool_and_batch(values)?) {
+            (None, ScalarValue::Boolean(v)) => v,
+            (Some(v), ScalarValue::Boolean(None)) => Some(v),
+            (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b),
+            _ => unreachable!(),
+        };
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Boolean(self.acc))
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
+
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![ScalarValue::Boolean(self.acc)])
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+}
+
+/// BOOL_OR aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolOr {
+    signature: Signature,
+}
+
+impl BoolOr {
+    fn new() -> Self {
+        Self {
+            signature: Signature::uniform(
+                1,
+                vec![DataType::Boolean],
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+
+impl Default for BoolOr {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl AggregateUDFImpl for BoolOr {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "bool_or"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn accumulator(&self, _: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::<BoolOrAccumulator>::default())
+    }
+
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            format_state_name(args.name, self.name()),
+            DataType::Boolean,
+            true,
+        )])
+    }
+
+    fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
+        true
+    }
+
+    fn create_groups_accumulator(
+        &self,
+        args: AccumulatorArgs,
+    ) -> Result<Box<dyn GroupsAccumulator>> {
+        match args.data_type {
+            DataType::Boolean => {
+                Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x || y)))
+            }
+            _ => not_impl_err!(
+                "GroupsAccumulator not supported for {} with {}",
+                args.name,
+                args.data_type
+            ),
+        }
+    }
+
+    fn aliases(&self) -> &[String] {
+        &[]
+    }
+
+    fn create_sliding_accumulator(
+        &self,
+        _: AccumulatorArgs,
+    ) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::<BoolOrAccumulator>::default())
+    }
+
+    fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+        AggregateOrderSensitivity::Insensitive
+    }
+
+    fn reverse_expr(&self) -> ReversedUDAF {
+        ReversedUDAF::Identical
+    }
+}
+
+#[derive(Debug, Default)]
+struct BoolOrAccumulator {
+    acc: Option<bool>,
+}
+
+impl Accumulator for BoolOrAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = &values[0];
+        self.acc = match (self.acc, bool_or_batch(values)?) {
+            (None, ScalarValue::Boolean(v)) => v,
+            (Some(v), ScalarValue::Boolean(None)) => Some(v),
+            (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b),
+            _ => unreachable!(),
+        };
+        Ok(())
+    }
+
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Boolean(self.acc))
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
+
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![ScalarValue::Boolean(self.acc)])
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        self.update_batch(states)
+    }
+}
diff --git a/datafusion/functions-aggregate/src/lib.rs 
b/datafusion/functions-aggregate/src/lib.rs
index 20a8d2c159..260d6dab31 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -70,8 +70,8 @@ pub mod approx_median;
 pub mod approx_percentile_cont;
 pub mod approx_percentile_cont_with_weight;
 pub mod bit_and_or_xor;
+pub mod bool_and_or;
 pub mod string_agg;
-
 use crate::approx_percentile_cont::approx_percentile_cont_udaf;
 use 
crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf;
 use datafusion_common::Result;
@@ -89,6 +89,8 @@ pub mod expr_fn {
     pub use super::bit_and_or_xor::bit_and;
     pub use super::bit_and_or_xor::bit_or;
     pub use super::bit_and_or_xor::bit_xor;
+    pub use super::bool_and_or::bool_and;
+    pub use super::bool_and_or::bool_or;
     pub use super::count::count;
     pub use super::count::count_distinct;
     pub use super::covariance::covar_pop;
@@ -143,6 +145,8 @@ pub fn all_default_aggregate_functions() -> 
Vec<Arc<AggregateUDF>> {
         bit_and_or_xor::bit_and_udaf(),
         bit_and_or_xor::bit_or_udaf(),
         bit_and_or_xor::bit_xor_udaf(),
+        bool_and_or::bool_and_udaf(),
+        bool_and_or::bool_or_udaf(),
     ]
 }
 
diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs 
b/datafusion/physical-expr/src/aggregate/bool_and_or.rs
deleted file mode 100644
index 341932bd77..0000000000
--- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs
+++ /dev/null
@@ -1,394 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Defines physical expressions that can evaluated at runtime during query 
execution
-
-use crate::{AggregateExpr, PhysicalExpr};
-use arrow::datatypes::DataType;
-use arrow::{
-    array::{ArrayRef, BooleanArray},
-    datatypes::Field,
-};
-use datafusion_common::{
-    downcast_value, internal_err, not_impl_err, DataFusionError, Result, 
ScalarValue,
-};
-use datafusion_expr::{Accumulator, GroupsAccumulator};
-use std::any::Any;
-use std::sync::Arc;
-
-use crate::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator;
-use crate::aggregate::utils::down_cast_any_ref;
-use crate::expressions::format_state_name;
-use arrow::array::Array;
-use arrow::compute::{bool_and, bool_or};
-
-// returns the new value after bool_and/bool_or with the new values, taking 
nullability into account
-macro_rules! typed_bool_and_or_batch {
-    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
-        let array = downcast_value!($VALUES, $ARRAYTYPE);
-        let delta = $OP(array);
-        Ok(ScalarValue::$SCALAR(delta))
-    }};
-}
-
-// bool_and/bool_or the array and returns a ScalarValue of its corresponding 
type.
-macro_rules! bool_and_or_batch {
-    ($VALUES:expr, $OP:ident) => {{
-        match $VALUES.data_type() {
-            DataType::Boolean => {
-                typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
-            }
-            e => {
-                return internal_err!(
-                    "Bool and/Bool or is not expected to receive the type 
{e:?}"
-                );
-            }
-        }
-    }};
-}
-
-/// dynamically-typed bool_and(array) -> ScalarValue
-fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
-    bool_and_or_batch!(values, bool_and)
-}
-
-/// dynamically-typed bool_or(array) -> ScalarValue
-fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
-    bool_and_or_batch!(values, bool_or)
-}
-
-/// BOOL_AND aggregate expression
-#[derive(Debug, Clone)]
-pub struct BoolAnd {
-    name: String,
-    pub data_type: DataType,
-    expr: Arc<dyn PhysicalExpr>,
-    nullable: bool,
-}
-
-impl BoolAnd {
-    /// Create a new BOOL_AND aggregate function
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-    ) -> Self {
-        Self {
-            name: name.into(),
-            expr,
-            data_type,
-            nullable: true,
-        }
-    }
-}
-
-impl AggregateExpr for BoolAnd {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.nullable,
-        ))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::<BoolAndAccumulator>::default())
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        Ok(vec![Field::new(
-            format_state_name(&self.name, "bool_and"),
-            self.data_type.clone(),
-            self.nullable,
-        )])
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn groups_accumulator_supported(&self) -> bool {
-        true
-    }
-
-    fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
-        match self.data_type {
-            DataType::Boolean => {
-                Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y)))
-            }
-            _ => not_impl_err!(
-                "GroupsAccumulator not supported for {} with {}",
-                self.name(),
-                self.data_type
-            ),
-        }
-    }
-
-    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        Some(Arc::new(self.clone()))
-    }
-
-    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::<BoolAndAccumulator>::default())
-    }
-}
-
-impl PartialEq<dyn Any> for BoolAnd {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.data_type == x.data_type
-                    && self.nullable == x.nullable
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
-#[derive(Debug, Default)]
-struct BoolAndAccumulator {
-    acc: Option<bool>,
-}
-
-impl Accumulator for BoolAndAccumulator {
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        let values = &values[0];
-        self.acc = match (self.acc, bool_and_batch(values)?) {
-            (None, ScalarValue::Boolean(v)) => v,
-            (Some(v), ScalarValue::Boolean(None)) => Some(v),
-            (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b),
-            _ => unreachable!(),
-        };
-        Ok(())
-    }
-
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        self.update_batch(states)
-    }
-
-    fn state(&mut self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![ScalarValue::Boolean(self.acc)])
-    }
-
-    fn evaluate(&mut self) -> Result<ScalarValue> {
-        Ok(ScalarValue::Boolean(self.acc))
-    }
-
-    fn size(&self) -> usize {
-        std::mem::size_of_val(self)
-    }
-}
-
-/// BOOL_OR aggregate expression
-#[derive(Debug, Clone)]
-pub struct BoolOr {
-    name: String,
-    pub data_type: DataType,
-    expr: Arc<dyn PhysicalExpr>,
-    nullable: bool,
-}
-
-impl BoolOr {
-    /// Create a new BOOL_OR aggregate function
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        data_type: DataType,
-    ) -> Self {
-        Self {
-            name: name.into(),
-            expr,
-            data_type,
-            nullable: true,
-        }
-    }
-}
-
-impl AggregateExpr for BoolOr {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.nullable,
-        ))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::<BoolOrAccumulator>::default())
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        Ok(vec![Field::new(
-            format_state_name(&self.name, "bool_or"),
-            self.data_type.clone(),
-            self.nullable,
-        )])
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn groups_accumulator_supported(&self) -> bool {
-        true
-    }
-
-    fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
-        match self.data_type {
-            DataType::Boolean => {
-                Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x || y)))
-            }
-            _ => not_impl_err!(
-                "GroupsAccumulator not supported for {} with {}",
-                self.name(),
-                self.data_type
-            ),
-        }
-    }
-
-    fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
-        Some(Arc::new(self.clone()))
-    }
-
-    fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::<BoolOrAccumulator>::default())
-    }
-}
-
-impl PartialEq<dyn Any> for BoolOr {
-    fn eq(&self, other: &dyn Any) -> bool {
-        down_cast_any_ref(other)
-            .downcast_ref::<Self>()
-            .map(|x| {
-                self.name == x.name
-                    && self.data_type == x.data_type
-                    && self.nullable == x.nullable
-                    && self.expr.eq(&x.expr)
-            })
-            .unwrap_or(false)
-    }
-}
-
-#[derive(Debug, Default)]
-struct BoolOrAccumulator {
-    acc: Option<bool>,
-}
-
-impl Accumulator for BoolOrAccumulator {
-    fn state(&mut self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![ScalarValue::Boolean(self.acc)])
-    }
-
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        let values = &values[0];
-        self.acc = match (self.acc, bool_or_batch(values)?) {
-            (None, ScalarValue::Boolean(v)) => v,
-            (Some(v), ScalarValue::Boolean(None)) => Some(v),
-            (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b),
-            _ => unreachable!(),
-        };
-        Ok(())
-    }
-
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        self.update_batch(states)
-    }
-
-    fn evaluate(&mut self) -> Result<ScalarValue> {
-        Ok(ScalarValue::Boolean(self.acc))
-    }
-
-    fn size(&self) -> usize {
-        std::mem::size_of_val(self)
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::expressions::col;
-    use crate::expressions::tests::aggregate;
-    use crate::generic_test_op;
-    use arrow::datatypes::*;
-    use arrow::record_batch::RecordBatch;
-
-    #[test]
-    fn test_bool_and() -> Result<()> {
-        let a: ArrayRef = Arc::new(BooleanArray::from(vec![true, true, 
false]));
-        generic_test_op!(a, DataType::Boolean, BoolAnd, 
ScalarValue::from(false))
-    }
-
-    #[test]
-    fn bool_and_with_nulls() -> Result<()> {
-        let a: ArrayRef = Arc::new(BooleanArray::from(vec![
-            Some(true),
-            None,
-            Some(true),
-            Some(true),
-        ]));
-        generic_test_op!(a, DataType::Boolean, BoolAnd, 
ScalarValue::from(true))
-    }
-
-    #[test]
-    fn bool_and_all_nulls() -> Result<()> {
-        let a: ArrayRef = Arc::new(BooleanArray::from(vec![None, None]));
-        generic_test_op!(a, DataType::Boolean, BoolAnd, 
ScalarValue::Boolean(None))
-    }
-
-    #[test]
-    fn test_bool_or() -> Result<()> {
-        let a: ArrayRef = Arc::new(BooleanArray::from(vec![true, true, 
false]));
-        generic_test_op!(a, DataType::Boolean, BoolOr, ScalarValue::from(true))
-    }
-
-    #[test]
-    fn bool_or_with_nulls() -> Result<()> {
-        let a: ArrayRef = Arc::new(BooleanArray::from(vec![
-            Some(false),
-            None,
-            Some(false),
-            Some(false),
-        ]));
-        generic_test_op!(a, DataType::Boolean, BoolOr, 
ScalarValue::from(false))
-    }
-
-    #[test]
-    fn bool_or_all_nulls() -> Result<()> {
-        let a: ArrayRef = Arc::new(BooleanArray::from(vec![None, None]));
-        generic_test_op!(a, DataType::Boolean, BoolOr, 
ScalarValue::Boolean(None))
-    }
-}
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs 
b/datafusion/physical-expr/src/aggregate/build_in.rs
index 1dfe9ffd69..53cfcfb033 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -66,16 +66,6 @@ pub fn create_aggregate_expr(
             name,
             data_type,
         )),
-        (AggregateFunction::BoolAnd, _) => Arc::new(expressions::BoolAnd::new(
-            input_phy_exprs[0].clone(),
-            name,
-            data_type,
-        )),
-        (AggregateFunction::BoolOr, _) => Arc::new(expressions::BoolOr::new(
-            input_phy_exprs[0].clone(),
-            name,
-            data_type,
-        )),
         (AggregateFunction::ArrayAgg, false) => {
             let expr = input_phy_exprs[0].clone();
             let nullable = expr.nullable(input_schema)?;
@@ -165,9 +155,7 @@ mod tests {
     use datafusion_common::plan_err;
     use datafusion_expr::{type_coercion, Signature};
 
-    use crate::expressions::{
-        try_cast, ArrayAgg, Avg, BoolAnd, BoolOr, DistinctArrayAgg, Max, Min,
-    };
+    use crate::expressions::{try_cast, ArrayAgg, Avg, DistinctArrayAgg, Max, 
Min};
 
     use super::*;
     #[test]
@@ -281,48 +269,6 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_bool_and_or_expr() -> Result<()> {
-        let funcs = vec![AggregateFunction::BoolAnd, 
AggregateFunction::BoolOr];
-        let data_types = vec![DataType::Boolean];
-        for fun in funcs {
-            for data_type in &data_types {
-                let input_schema =
-                    Schema::new(vec![Field::new("c1", data_type.clone(), 
true)]);
-                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = 
vec![Arc::new(
-                    expressions::Column::new_with_schema("c1", 
&input_schema).unwrap(),
-                )];
-                let result_agg_phy_exprs = create_physical_agg_expr_for_test(
-                    &fun,
-                    false,
-                    &input_phy_exprs[0..1],
-                    &input_schema,
-                    "c1",
-                )?;
-                match fun {
-                    AggregateFunction::BoolAnd => {
-                        assert!(result_agg_phy_exprs.as_any().is::<BoolAnd>());
-                        assert_eq!("c1", result_agg_phy_exprs.name());
-                        assert_eq!(
-                            Field::new("c1", data_type.clone(), true),
-                            result_agg_phy_exprs.field().unwrap()
-                        );
-                    }
-                    AggregateFunction::BoolOr => {
-                        assert!(result_agg_phy_exprs.as_any().is::<BoolOr>());
-                        assert_eq!("c1", result_agg_phy_exprs.name());
-                        assert_eq!(
-                            Field::new("c1", data_type.clone(), true),
-                            result_agg_phy_exprs.field().unwrap()
-                        );
-                    }
-                    _ => {}
-                };
-            }
-        }
-        Ok(())
-    }
-
     #[test]
     fn test_sum_avg_expr() -> Result<()> {
         let funcs = vec![AggregateFunction::Avg];
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs 
b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
index a6946e739c..73d810ec05 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
@@ -25,9 +25,6 @@ pub(crate) mod accumulate {
 
 pub use 
datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState;
 
-pub(crate) mod bool_op {
-    pub use 
datafusion_physical_expr_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator;
-}
 pub(crate) mod prim_op {
     pub use 
datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
 }
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs 
b/datafusion/physical-expr/src/aggregate/mod.rs
index 87c7deccc2..f64c5b1fb2 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -21,7 +21,6 @@ pub(crate) mod array_agg;
 pub(crate) mod array_agg_distinct;
 pub(crate) mod array_agg_ordered;
 pub(crate) mod average;
-pub(crate) mod bool_and_or;
 pub(crate) mod correlation;
 pub(crate) mod covariance;
 pub(crate) mod grouping;
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 3226104040..0020aa5f55 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -40,7 +40,6 @@ pub use 
crate::aggregate::array_agg_distinct::DistinctArrayAgg;
 pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg;
 pub use crate::aggregate::average::Avg;
 pub use crate::aggregate::average::AvgAccumulator;
-pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr};
 pub use crate::aggregate::build_in::create_aggregate_expr;
 pub use crate::aggregate::correlation::Correlation;
 pub use crate::aggregate::grouping::Grouping;
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 6375df721a..50356d5b60 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -494,8 +494,8 @@ enum AggregateFunction {
   // BIT_AND = 19;
   // BIT_OR = 20;
   // BIT_XOR = 21;
-  BOOL_AND = 22;
-  BOOL_OR = 23;
+//  BOOL_AND = 22;
+//  BOOL_OR = 23;
   // REGR_SLOPE = 26;
   // REGR_INTERCEPT = 27;
   // REGR_COUNT = 28;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 5c483f70d1..8cca0fe4a8 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -538,8 +538,6 @@ impl serde::Serialize for AggregateFunction {
             Self::ArrayAgg => "ARRAY_AGG",
             Self::Correlation => "CORRELATION",
             Self::Grouping => "GROUPING",
-            Self::BoolAnd => "BOOL_AND",
-            Self::BoolOr => "BOOL_OR",
             Self::NthValueAgg => "NTH_VALUE_AGG",
         };
         serializer.serialize_str(variant)
@@ -558,8 +556,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
             "ARRAY_AGG",
             "CORRELATION",
             "GROUPING",
-            "BOOL_AND",
-            "BOOL_OR",
             "NTH_VALUE_AGG",
         ];
 
@@ -607,8 +603,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
                     "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg),
                     "CORRELATION" => Ok(AggregateFunction::Correlation),
                     "GROUPING" => Ok(AggregateFunction::Grouping),
-                    "BOOL_AND" => Ok(AggregateFunction::BoolAnd),
-                    "BOOL_OR" => Ok(AggregateFunction::BoolOr),
                     "NTH_VALUE_AGG" => Ok(AggregateFunction::NthValueAgg),
                     _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
                 }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index bc5b6be2ad..56f1498292 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1948,8 +1948,8 @@ pub enum AggregateFunction {
     /// BIT_AND = 19;
     /// BIT_OR = 20;
     /// BIT_XOR = 21;
-    BoolAnd = 22,
-    BoolOr = 23,
+    ///   BOOL_AND = 22;
+    ///   BOOL_OR = 23;
     /// REGR_SLOPE = 26;
     /// REGR_INTERCEPT = 27;
     /// REGR_COUNT = 28;
@@ -1975,8 +1975,6 @@ impl AggregateFunction {
             AggregateFunction::ArrayAgg => "ARRAY_AGG",
             AggregateFunction::Correlation => "CORRELATION",
             AggregateFunction::Grouping => "GROUPING",
-            AggregateFunction::BoolAnd => "BOOL_AND",
-            AggregateFunction::BoolOr => "BOOL_OR",
             AggregateFunction::NthValueAgg => "NTH_VALUE_AGG",
         }
     }
@@ -1989,8 +1987,6 @@ impl AggregateFunction {
             "ARRAY_AGG" => Some(Self::ArrayAgg),
             "CORRELATION" => Some(Self::Correlation),
             "GROUPING" => Some(Self::Grouping),
-            "BOOL_AND" => Some(Self::BoolAnd),
-            "BOOL_OR" => Some(Self::BoolOr),
             "NTH_VALUE_AGG" => Some(Self::NthValueAgg),
             _ => None,
         }
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 5bec655bb1..ba0e708218 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -140,8 +140,6 @@ impl From<protobuf::AggregateFunction> for 
AggregateFunction {
             protobuf::AggregateFunction::Min => Self::Min,
             protobuf::AggregateFunction::Max => Self::Max,
             protobuf::AggregateFunction::Avg => Self::Avg,
-            protobuf::AggregateFunction::BoolAnd => Self::BoolAnd,
-            protobuf::AggregateFunction::BoolOr => Self::BoolOr,
             protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
             protobuf::AggregateFunction::Correlation => Self::Correlation,
             protobuf::AggregateFunction::Grouping => Self::Grouping,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 66b7c77799..08999effa4 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -111,8 +111,6 @@ impl From<&AggregateFunction> for 
protobuf::AggregateFunction {
             AggregateFunction::Min => Self::Min,
             AggregateFunction::Max => Self::Max,
             AggregateFunction::Avg => Self::Avg,
-            AggregateFunction::BoolAnd => Self::BoolAnd,
-            AggregateFunction::BoolOr => Self::BoolOr,
             AggregateFunction::ArrayAgg => Self::ArrayAgg,
             AggregateFunction::Correlation => Self::Correlation,
             AggregateFunction::Grouping => Self::Grouping,
@@ -376,8 +374,6 @@ pub fn serialize_expr(
                     AggregateFunction::ArrayAgg => 
protobuf::AggregateFunction::ArrayAgg,
                     AggregateFunction::Min => protobuf::AggregateFunction::Min,
                     AggregateFunction::Max => protobuf::AggregateFunction::Max,
-                    AggregateFunction::BoolAnd => 
protobuf::AggregateFunction::BoolAnd,
-                    AggregateFunction::BoolOr => 
protobuf::AggregateFunction::BoolOr,
                     AggregateFunction::Avg => protobuf::AggregateFunction::Avg,
                     AggregateFunction::Correlation => {
                         protobuf::AggregateFunction::Correlation
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index ed966509b8..a9d3736dee 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -23,10 +23,10 @@ use 
datafusion::datasource::file_format::parquet::ParquetSink;
 use datafusion::physical_expr::window::{NthValueKind, 
SlidingAggregateWindowExpr};
 use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
 use datafusion::physical_plan::expressions::{
-    ArrayAgg, Avg, BinaryExpr, BoolAnd, BoolOr, CaseExpr, CastExpr, Column, 
Correlation,
-    CumeDist, DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr, 
IsNullExpr, Literal,
-    Max, Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile,
-    OrderSensitiveArrayAgg, Rank, RankType, RowNumber, TryCastExpr, 
WindowShift,
+    ArrayAgg, Avg, BinaryExpr, CaseExpr, CastExpr, Column, Correlation, 
CumeDist,
+    DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr, IsNullExpr, 
Literal, Max, Min,
+    NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile, 
OrderSensitiveArrayAgg, Rank,
+    RankType, RowNumber, TryCastExpr, WindowShift,
 };
 use datafusion::physical_plan::udaf::AggregateFunctionExpr;
 use datafusion::physical_plan::windows::{BuiltInWindowExpr, 
PlainAggregateWindowExpr};
@@ -240,10 +240,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> 
Result<AggrFn> {
 
     let inner = if aggr_expr.downcast_ref::<Grouping>().is_some() {
         protobuf::AggregateFunction::Grouping
-    } else if aggr_expr.downcast_ref::<BoolAnd>().is_some() {
-        protobuf::AggregateFunction::BoolAnd
-    } else if aggr_expr.downcast_ref::<BoolOr>().is_some() {
-        protobuf::AggregateFunction::BoolOr
     } else if aggr_expr.downcast_ref::<ArrayAgg>().is_some() {
         protobuf::AggregateFunction::ArrayAgg
     } else if aggr_expr.downcast_ref::<DistinctArrayAgg>().is_some() {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 61764394ee..b3966c3f02 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -59,7 +59,9 @@ use datafusion_expr::{
     TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits,
     WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
 };
-use datafusion_functions_aggregate::expr_fn::{bit_and, bit_or, bit_xor};
+use datafusion_functions_aggregate::expr_fn::{
+    bit_and, bit_or, bit_xor, bool_and, bool_or,
+};
 use datafusion_functions_aggregate::string_agg::string_agg;
 use datafusion_proto::bytes::{
     logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec,
@@ -671,6 +673,8 @@ async fn roundtrip_expr_api() -> Result<()> {
         bit_or(lit(2)),
         bit_xor(lit(2)),
         string_agg(col("a").cast_to(&DataType::Utf8, &schema)?, lit("|")),
+        bool_and(lit(true)),
+        bool_or(lit(true)),
     ];
 
     // ensure expressions created with the expr api can be round tripped


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to