This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 89def2c6e5 Convert `bool_and` & `bool_or` to UDAF (#11009)
89def2c6e5 is described below
commit 89def2c6e5f32f15dd80dd19d11b3087e1647310
Author: jcsherin <[email protected]>
AuthorDate: Thu Jun 20 16:29:59 2024 +0530
Convert `bool_and` & `bool_or` to UDAF (#11009)
* Port `bool_and` and `bool_or` to `AggregateUDFImpl`
* Remove trait methods with default implementation
* Add `bool_or_udaf`
* Register `bool_and` and `bool_or`
* Remove from `physical-expr`
* Add expressions to logical plan roundtrip test
* minor: remove methods with default implementation
* Removes redundant tests
* Removes hard-coded function names
---
datafusion/expr/src/aggregate_function.rs | 15 -
datafusion/expr/src/type_coercion/aggregates.rs | 16 -
datafusion/functions-aggregate/src/bool_and_or.rs | 343 ++++++++++++++++++
datafusion/functions-aggregate/src/lib.rs | 6 +-
.../physical-expr/src/aggregate/bool_and_or.rs | 394 ---------------------
datafusion/physical-expr/src/aggregate/build_in.rs | 56 +--
.../src/aggregate/groups_accumulator/mod.rs | 3 -
datafusion/physical-expr/src/aggregate/mod.rs | 1 -
datafusion/physical-expr/src/expressions/mod.rs | 1 -
datafusion/proto/proto/datafusion.proto | 4 +-
datafusion/proto/src/generated/pbjson.rs | 6 -
datafusion/proto/src/generated/prost.rs | 8 +-
datafusion/proto/src/logical_plan/from_proto.rs | 2 -
datafusion/proto/src/logical_plan/to_proto.rs | 4 -
datafusion/proto/src/physical_plan/to_proto.rs | 12 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 6 +-
16 files changed, 362 insertions(+), 515 deletions(-)
diff --git a/datafusion/expr/src/aggregate_function.rs
b/datafusion/expr/src/aggregate_function.rs
index 1cde1c5050..967ccc0b08 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -47,10 +47,6 @@ pub enum AggregateFunction {
Correlation,
/// Grouping
Grouping,
- /// Bool And
- BoolAnd,
- /// Bool Or
- BoolOr,
}
impl AggregateFunction {
@@ -64,8 +60,6 @@ impl AggregateFunction {
NthValue => "NTH_VALUE",
Correlation => "CORR",
Grouping => "GROUPING",
- BoolAnd => "BOOL_AND",
- BoolOr => "BOOL_OR",
}
}
}
@@ -82,8 +76,6 @@ impl FromStr for AggregateFunction {
Ok(match name {
// general
"avg" => AggregateFunction::Avg,
- "bool_and" => AggregateFunction::BoolAnd,
- "bool_or" => AggregateFunction::BoolOr,
"max" => AggregateFunction::Max,
"mean" => AggregateFunction::Avg,
"min" => AggregateFunction::Min,
@@ -128,9 +120,6 @@ impl AggregateFunction {
// The coerced_data_types is same with input_types.
Ok(coerced_data_types[0].clone())
}
- AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
- Ok(DataType::Boolean)
- }
AggregateFunction::Correlation => {
correlation_return_type(&coerced_data_types[0])
}
@@ -179,10 +168,6 @@ impl AggregateFunction {
.collect::<Vec<_>>();
Signature::uniform(1, valid, Volatility::Immutable)
}
- AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
- Signature::uniform(1, vec![DataType::Boolean],
Volatility::Immutable)
- }
-
AggregateFunction::Avg => {
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
}
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs
b/datafusion/expr/src/type_coercion/aggregates.rs
index abe6d8b182..428fc99070 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -121,18 +121,6 @@ pub fn coerce_types(
};
Ok(vec![v])
}
- AggregateFunction::BoolAnd | AggregateFunction::BoolOr => {
- // Refer to
https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
- // smallint, int, bigint, real, double precision, decimal, or
interval.
- if !is_bool_and_or_support_arg_type(&input_types[0]) {
- return plan_err!(
- "The function {:?} does not support inputs of type {:?}.",
- agg_fun,
- input_types[0]
- );
- }
- Ok(input_types.to_vec())
- }
AggregateFunction::Correlation => {
if !is_correlation_support_arg_type(&input_types[0]) {
return plan_err!(
@@ -319,10 +307,6 @@ pub fn avg_sum_type(arg_type: &DataType) ->
Result<DataType> {
}
}
-pub fn is_bool_and_or_support_arg_type(arg_type: &DataType) -> bool {
- matches!(arg_type, DataType::Boolean)
-}
-
pub fn is_sum_support_arg_type(arg_type: &DataType) -> bool {
match arg_type {
DataType::Dictionary(_, dict_value_type) => {
diff --git a/datafusion/functions-aggregate/src/bool_and_or.rs
b/datafusion/functions-aggregate/src/bool_and_or.rs
new file mode 100644
index 0000000000..d002867274
--- /dev/null
+++ b/datafusion/functions-aggregate/src/bool_and_or.rs
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query
execution
+
+use std::any::Any;
+
+use arrow::array::ArrayRef;
+use arrow::array::BooleanArray;
+use arrow::compute::bool_and as compute_bool_and;
+use arrow::compute::bool_or as compute_bool_or;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use datafusion_common::internal_err;
+use datafusion_common::{downcast_value, not_impl_err};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::utils::{format_state_name, AggregateOrderSensitivity};
+use datafusion_expr::{
+ Accumulator, AggregateUDFImpl, GroupsAccumulator, ReversedUDAF, Signature,
Volatility,
+};
+
+use
datafusion_physical_expr_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator;
+
+// returns the new value after bool_and/bool_or with the new values, taking
nullability into account
+macro_rules! typed_bool_and_or_batch {
+ ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
+ let array = downcast_value!($VALUES, $ARRAYTYPE);
+ let delta = $OP(array);
+ Ok(ScalarValue::$SCALAR(delta))
+ }};
+}
+
+// bool_and/bool_or the array and returns a ScalarValue of its corresponding
type.
+macro_rules! bool_and_or_batch {
+ ($VALUES:expr, $OP:ident) => {{
+ match $VALUES.data_type() {
+ DataType::Boolean => {
+ typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
+ }
+ e => {
+ return internal_err!(
+ "Bool and/Bool or is not expected to receive the type
{e:?}"
+ );
+ }
+ }
+ }};
+}
+
+/// dynamically-typed bool_and(array) -> ScalarValue
+fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
+ bool_and_or_batch!(values, compute_bool_and)
+}
+
+/// dynamically-typed bool_or(array) -> ScalarValue
+fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
+ bool_and_or_batch!(values, compute_bool_or)
+}
+
+make_udaf_expr_and_func!(
+ BoolAnd,
+ bool_and,
+ expression,
+ "The values to combine with `AND`",
+ bool_and_udaf
+);
+
+make_udaf_expr_and_func!(
+ BoolOr,
+ bool_or,
+ expression,
+ "The values to combine with `OR`",
+ bool_or_udaf
+);
+
+/// BOOL_AND aggregate expression
+#[derive(Debug)]
+pub struct BoolAnd {
+ signature: Signature,
+}
+
+impl BoolAnd {
+ fn new() -> Self {
+ Self {
+ signature: Signature::uniform(
+ 1,
+ vec![DataType::Boolean],
+ Volatility::Immutable,
+ ),
+ }
+ }
+}
+
+impl Default for BoolAnd {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl AggregateUDFImpl for BoolAnd {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "bool_and"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _: &[DataType]) -> Result<DataType> {
+ Ok(DataType::Boolean)
+ }
+
+ fn accumulator(&self, _: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::<BoolAndAccumulator>::default())
+ }
+
+ fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+ Ok(vec![Field::new(
+ format_state_name(args.name, self.name()),
+ DataType::Boolean,
+ true,
+ )])
+ }
+
+ fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
+ true
+ }
+
+ fn create_groups_accumulator(
+ &self,
+ args: AccumulatorArgs,
+ ) -> Result<Box<dyn GroupsAccumulator>> {
+ match args.data_type {
+ DataType::Boolean => {
+ Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y)))
+ }
+ _ => not_impl_err!(
+ "GroupsAccumulator not supported for {} with {}",
+ args.name,
+ args.data_type
+ ),
+ }
+ }
+
+ fn aliases(&self) -> &[String] {
+ &[]
+ }
+
+ fn create_sliding_accumulator(
+ &self,
+ _: AccumulatorArgs,
+ ) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::<BoolAndAccumulator>::default())
+ }
+
+ fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+ AggregateOrderSensitivity::Insensitive
+ }
+
+ fn reverse_expr(&self) -> ReversedUDAF {
+ ReversedUDAF::Identical
+ }
+}
+
+#[derive(Debug, Default)]
+struct BoolAndAccumulator {
+ acc: Option<bool>,
+}
+
+impl Accumulator for BoolAndAccumulator {
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ let values = &values[0];
+ self.acc = match (self.acc, bool_and_batch(values)?) {
+ (None, ScalarValue::Boolean(v)) => v,
+ (Some(v), ScalarValue::Boolean(None)) => Some(v),
+ (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b),
+ _ => unreachable!(),
+ };
+ Ok(())
+ }
+
+ fn evaluate(&mut self) -> Result<ScalarValue> {
+ Ok(ScalarValue::Boolean(self.acc))
+ }
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
+
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
+ Ok(vec![ScalarValue::Boolean(self.acc)])
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ self.update_batch(states)
+ }
+}
+
+/// BOOL_OR aggregate expression
+#[derive(Debug, Clone)]
+pub struct BoolOr {
+ signature: Signature,
+}
+
+impl BoolOr {
+ fn new() -> Self {
+ Self {
+ signature: Signature::uniform(
+ 1,
+ vec![DataType::Boolean],
+ Volatility::Immutable,
+ ),
+ }
+ }
+}
+
+impl Default for BoolOr {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl AggregateUDFImpl for BoolOr {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "bool_or"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _: &[DataType]) -> Result<DataType> {
+ Ok(DataType::Boolean)
+ }
+
+ fn accumulator(&self, _: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::<BoolOrAccumulator>::default())
+ }
+
+ fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+ Ok(vec![Field::new(
+ format_state_name(args.name, self.name()),
+ DataType::Boolean,
+ true,
+ )])
+ }
+
+ fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
+ true
+ }
+
+ fn create_groups_accumulator(
+ &self,
+ args: AccumulatorArgs,
+ ) -> Result<Box<dyn GroupsAccumulator>> {
+ match args.data_type {
+ DataType::Boolean => {
+ Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x || y)))
+ }
+ _ => not_impl_err!(
+ "GroupsAccumulator not supported for {} with {}",
+ args.name,
+ args.data_type
+ ),
+ }
+ }
+
+ fn aliases(&self) -> &[String] {
+ &[]
+ }
+
+ fn create_sliding_accumulator(
+ &self,
+ _: AccumulatorArgs,
+ ) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::<BoolOrAccumulator>::default())
+ }
+
+ fn order_sensitivity(&self) -> AggregateOrderSensitivity {
+ AggregateOrderSensitivity::Insensitive
+ }
+
+ fn reverse_expr(&self) -> ReversedUDAF {
+ ReversedUDAF::Identical
+ }
+}
+
+#[derive(Debug, Default)]
+struct BoolOrAccumulator {
+ acc: Option<bool>,
+}
+
+impl Accumulator for BoolOrAccumulator {
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ let values = &values[0];
+ self.acc = match (self.acc, bool_or_batch(values)?) {
+ (None, ScalarValue::Boolean(v)) => v,
+ (Some(v), ScalarValue::Boolean(None)) => Some(v),
+ (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b),
+ _ => unreachable!(),
+ };
+ Ok(())
+ }
+
+ fn evaluate(&mut self) -> Result<ScalarValue> {
+ Ok(ScalarValue::Boolean(self.acc))
+ }
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self)
+ }
+
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
+ Ok(vec![ScalarValue::Boolean(self.acc)])
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ self.update_batch(states)
+ }
+}
diff --git a/datafusion/functions-aggregate/src/lib.rs
b/datafusion/functions-aggregate/src/lib.rs
index 20a8d2c159..260d6dab31 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -70,8 +70,8 @@ pub mod approx_median;
pub mod approx_percentile_cont;
pub mod approx_percentile_cont_with_weight;
pub mod bit_and_or_xor;
+pub mod bool_and_or;
pub mod string_agg;
-
use crate::approx_percentile_cont::approx_percentile_cont_udaf;
use
crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf;
use datafusion_common::Result;
@@ -89,6 +89,8 @@ pub mod expr_fn {
pub use super::bit_and_or_xor::bit_and;
pub use super::bit_and_or_xor::bit_or;
pub use super::bit_and_or_xor::bit_xor;
+ pub use super::bool_and_or::bool_and;
+ pub use super::bool_and_or::bool_or;
pub use super::count::count;
pub use super::count::count_distinct;
pub use super::covariance::covar_pop;
@@ -143,6 +145,8 @@ pub fn all_default_aggregate_functions() ->
Vec<Arc<AggregateUDF>> {
bit_and_or_xor::bit_and_udaf(),
bit_and_or_xor::bit_or_udaf(),
bit_and_or_xor::bit_xor_udaf(),
+ bool_and_or::bool_and_udaf(),
+ bool_and_or::bool_or_udaf(),
]
}
diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs
b/datafusion/physical-expr/src/aggregate/bool_and_or.rs
deleted file mode 100644
index 341932bd77..0000000000
--- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs
+++ /dev/null
@@ -1,394 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Defines physical expressions that can evaluated at runtime during query
execution
-
-use crate::{AggregateExpr, PhysicalExpr};
-use arrow::datatypes::DataType;
-use arrow::{
- array::{ArrayRef, BooleanArray},
- datatypes::Field,
-};
-use datafusion_common::{
- downcast_value, internal_err, not_impl_err, DataFusionError, Result,
ScalarValue,
-};
-use datafusion_expr::{Accumulator, GroupsAccumulator};
-use std::any::Any;
-use std::sync::Arc;
-
-use crate::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator;
-use crate::aggregate::utils::down_cast_any_ref;
-use crate::expressions::format_state_name;
-use arrow::array::Array;
-use arrow::compute::{bool_and, bool_or};
-
-// returns the new value after bool_and/bool_or with the new values, taking
nullability into account
-macro_rules! typed_bool_and_or_batch {
- ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
- let array = downcast_value!($VALUES, $ARRAYTYPE);
- let delta = $OP(array);
- Ok(ScalarValue::$SCALAR(delta))
- }};
-}
-
-// bool_and/bool_or the array and returns a ScalarValue of its corresponding
type.
-macro_rules! bool_and_or_batch {
- ($VALUES:expr, $OP:ident) => {{
- match $VALUES.data_type() {
- DataType::Boolean => {
- typed_bool_and_or_batch!($VALUES, BooleanArray, Boolean, $OP)
- }
- e => {
- return internal_err!(
- "Bool and/Bool or is not expected to receive the type
{e:?}"
- );
- }
- }
- }};
-}
-
-/// dynamically-typed bool_and(array) -> ScalarValue
-fn bool_and_batch(values: &ArrayRef) -> Result<ScalarValue> {
- bool_and_or_batch!(values, bool_and)
-}
-
-/// dynamically-typed bool_or(array) -> ScalarValue
-fn bool_or_batch(values: &ArrayRef) -> Result<ScalarValue> {
- bool_and_or_batch!(values, bool_or)
-}
-
-/// BOOL_AND aggregate expression
-#[derive(Debug, Clone)]
-pub struct BoolAnd {
- name: String,
- pub data_type: DataType,
- expr: Arc<dyn PhysicalExpr>,
- nullable: bool,
-}
-
-impl BoolAnd {
- /// Create a new BOOL_AND aggregate function
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- data_type: DataType,
- ) -> Self {
- Self {
- name: name.into(),
- expr,
- data_type,
- nullable: true,
- }
- }
-}
-
-impl AggregateExpr for BoolAnd {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn field(&self) -> Result<Field> {
- Ok(Field::new(
- &self.name,
- self.data_type.clone(),
- self.nullable,
- ))
- }
-
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::<BoolAndAccumulator>::default())
- }
-
- fn state_fields(&self) -> Result<Vec<Field>> {
- Ok(vec![Field::new(
- format_state_name(&self.name, "bool_and"),
- self.data_type.clone(),
- self.nullable,
- )])
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-
- fn groups_accumulator_supported(&self) -> bool {
- true
- }
-
- fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
- match self.data_type {
- DataType::Boolean => {
- Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y)))
- }
- _ => not_impl_err!(
- "GroupsAccumulator not supported for {} with {}",
- self.name(),
- self.data_type
- ),
- }
- }
-
- fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- Some(Arc::new(self.clone()))
- }
-
- fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::<BoolAndAccumulator>::default())
- }
-}
-
-impl PartialEq<dyn Any> for BoolAnd {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| {
- self.name == x.name
- && self.data_type == x.data_type
- && self.nullable == x.nullable
- && self.expr.eq(&x.expr)
- })
- .unwrap_or(false)
- }
-}
-
-#[derive(Debug, Default)]
-struct BoolAndAccumulator {
- acc: Option<bool>,
-}
-
-impl Accumulator for BoolAndAccumulator {
- fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- let values = &values[0];
- self.acc = match (self.acc, bool_and_batch(values)?) {
- (None, ScalarValue::Boolean(v)) => v,
- (Some(v), ScalarValue::Boolean(None)) => Some(v),
- (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b),
- _ => unreachable!(),
- };
- Ok(())
- }
-
- fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- self.update_batch(states)
- }
-
- fn state(&mut self) -> Result<Vec<ScalarValue>> {
- Ok(vec![ScalarValue::Boolean(self.acc)])
- }
-
- fn evaluate(&mut self) -> Result<ScalarValue> {
- Ok(ScalarValue::Boolean(self.acc))
- }
-
- fn size(&self) -> usize {
- std::mem::size_of_val(self)
- }
-}
-
-/// BOOL_OR aggregate expression
-#[derive(Debug, Clone)]
-pub struct BoolOr {
- name: String,
- pub data_type: DataType,
- expr: Arc<dyn PhysicalExpr>,
- nullable: bool,
-}
-
-impl BoolOr {
- /// Create a new BOOL_OR aggregate function
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
- data_type: DataType,
- ) -> Self {
- Self {
- name: name.into(),
- expr,
- data_type,
- nullable: true,
- }
- }
-}
-
-impl AggregateExpr for BoolOr {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn field(&self) -> Result<Field> {
- Ok(Field::new(
- &self.name,
- self.data_type.clone(),
- self.nullable,
- ))
- }
-
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::<BoolOrAccumulator>::default())
- }
-
- fn state_fields(&self) -> Result<Vec<Field>> {
- Ok(vec![Field::new(
- format_state_name(&self.name, "bool_or"),
- self.data_type.clone(),
- self.nullable,
- )])
- }
-
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- vec![self.expr.clone()]
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-
- fn groups_accumulator_supported(&self) -> bool {
- true
- }
-
- fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
- match self.data_type {
- DataType::Boolean => {
- Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x || y)))
- }
- _ => not_impl_err!(
- "GroupsAccumulator not supported for {} with {}",
- self.name(),
- self.data_type
- ),
- }
- }
-
- fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- Some(Arc::new(self.clone()))
- }
-
- fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::<BoolOrAccumulator>::default())
- }
-}
-
-impl PartialEq<dyn Any> for BoolOr {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| {
- self.name == x.name
- && self.data_type == x.data_type
- && self.nullable == x.nullable
- && self.expr.eq(&x.expr)
- })
- .unwrap_or(false)
- }
-}
-
-#[derive(Debug, Default)]
-struct BoolOrAccumulator {
- acc: Option<bool>,
-}
-
-impl Accumulator for BoolOrAccumulator {
- fn state(&mut self) -> Result<Vec<ScalarValue>> {
- Ok(vec![ScalarValue::Boolean(self.acc)])
- }
-
- fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- let values = &values[0];
- self.acc = match (self.acc, bool_or_batch(values)?) {
- (None, ScalarValue::Boolean(v)) => v,
- (Some(v), ScalarValue::Boolean(None)) => Some(v),
- (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b),
- _ => unreachable!(),
- };
- Ok(())
- }
-
- fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- self.update_batch(states)
- }
-
- fn evaluate(&mut self) -> Result<ScalarValue> {
- Ok(ScalarValue::Boolean(self.acc))
- }
-
- fn size(&self) -> usize {
- std::mem::size_of_val(self)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::expressions::col;
- use crate::expressions::tests::aggregate;
- use crate::generic_test_op;
- use arrow::datatypes::*;
- use arrow::record_batch::RecordBatch;
-
- #[test]
- fn test_bool_and() -> Result<()> {
- let a: ArrayRef = Arc::new(BooleanArray::from(vec![true, true,
false]));
- generic_test_op!(a, DataType::Boolean, BoolAnd,
ScalarValue::from(false))
- }
-
- #[test]
- fn bool_and_with_nulls() -> Result<()> {
- let a: ArrayRef = Arc::new(BooleanArray::from(vec![
- Some(true),
- None,
- Some(true),
- Some(true),
- ]));
- generic_test_op!(a, DataType::Boolean, BoolAnd,
ScalarValue::from(true))
- }
-
- #[test]
- fn bool_and_all_nulls() -> Result<()> {
- let a: ArrayRef = Arc::new(BooleanArray::from(vec![None, None]));
- generic_test_op!(a, DataType::Boolean, BoolAnd,
ScalarValue::Boolean(None))
- }
-
- #[test]
- fn test_bool_or() -> Result<()> {
- let a: ArrayRef = Arc::new(BooleanArray::from(vec![true, true,
false]));
- generic_test_op!(a, DataType::Boolean, BoolOr, ScalarValue::from(true))
- }
-
- #[test]
- fn bool_or_with_nulls() -> Result<()> {
- let a: ArrayRef = Arc::new(BooleanArray::from(vec![
- Some(false),
- None,
- Some(false),
- Some(false),
- ]));
- generic_test_op!(a, DataType::Boolean, BoolOr,
ScalarValue::from(false))
- }
-
- #[test]
- fn bool_or_all_nulls() -> Result<()> {
- let a: ArrayRef = Arc::new(BooleanArray::from(vec![None, None]));
- generic_test_op!(a, DataType::Boolean, BoolOr,
ScalarValue::Boolean(None))
- }
-}
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs
b/datafusion/physical-expr/src/aggregate/build_in.rs
index 1dfe9ffd69..53cfcfb033 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -66,16 +66,6 @@ pub fn create_aggregate_expr(
name,
data_type,
)),
- (AggregateFunction::BoolAnd, _) => Arc::new(expressions::BoolAnd::new(
- input_phy_exprs[0].clone(),
- name,
- data_type,
- )),
- (AggregateFunction::BoolOr, _) => Arc::new(expressions::BoolOr::new(
- input_phy_exprs[0].clone(),
- name,
- data_type,
- )),
(AggregateFunction::ArrayAgg, false) => {
let expr = input_phy_exprs[0].clone();
let nullable = expr.nullable(input_schema)?;
@@ -165,9 +155,7 @@ mod tests {
use datafusion_common::plan_err;
use datafusion_expr::{type_coercion, Signature};
- use crate::expressions::{
- try_cast, ArrayAgg, Avg, BoolAnd, BoolOr, DistinctArrayAgg, Max, Min,
- };
+ use crate::expressions::{try_cast, ArrayAgg, Avg, DistinctArrayAgg, Max,
Min};
use super::*;
#[test]
@@ -281,48 +269,6 @@ mod tests {
Ok(())
}
- #[test]
- fn test_bool_and_or_expr() -> Result<()> {
- let funcs = vec![AggregateFunction::BoolAnd,
AggregateFunction::BoolOr];
- let data_types = vec![DataType::Boolean];
- for fun in funcs {
- for data_type in &data_types {
- let input_schema =
- Schema::new(vec![Field::new("c1", data_type.clone(),
true)]);
- let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> =
vec![Arc::new(
- expressions::Column::new_with_schema("c1",
&input_schema).unwrap(),
- )];
- let result_agg_phy_exprs = create_physical_agg_expr_for_test(
- &fun,
- false,
- &input_phy_exprs[0..1],
- &input_schema,
- "c1",
- )?;
- match fun {
- AggregateFunction::BoolAnd => {
- assert!(result_agg_phy_exprs.as_any().is::<BoolAnd>());
- assert_eq!("c1", result_agg_phy_exprs.name());
- assert_eq!(
- Field::new("c1", data_type.clone(), true),
- result_agg_phy_exprs.field().unwrap()
- );
- }
- AggregateFunction::BoolOr => {
- assert!(result_agg_phy_exprs.as_any().is::<BoolOr>());
- assert_eq!("c1", result_agg_phy_exprs.name());
- assert_eq!(
- Field::new("c1", data_type.clone(), true),
- result_agg_phy_exprs.field().unwrap()
- );
- }
- _ => {}
- };
- }
- }
- Ok(())
- }
-
#[test]
fn test_sum_avg_expr() -> Result<()> {
let funcs = vec![AggregateFunction::Avg];
diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
index a6946e739c..73d810ec05 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs
@@ -25,9 +25,6 @@ pub(crate) mod accumulate {
pub use
datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState;
-pub(crate) mod bool_op {
- pub use
datafusion_physical_expr_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator;
-}
pub(crate) mod prim_op {
pub use
datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
}
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs
b/datafusion/physical-expr/src/aggregate/mod.rs
index 87c7deccc2..f64c5b1fb2 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -21,7 +21,6 @@ pub(crate) mod array_agg;
pub(crate) mod array_agg_distinct;
pub(crate) mod array_agg_ordered;
pub(crate) mod average;
-pub(crate) mod bool_and_or;
pub(crate) mod correlation;
pub(crate) mod covariance;
pub(crate) mod grouping;
diff --git a/datafusion/physical-expr/src/expressions/mod.rs
b/datafusion/physical-expr/src/expressions/mod.rs
index 3226104040..0020aa5f55 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -40,7 +40,6 @@ pub use
crate::aggregate::array_agg_distinct::DistinctArrayAgg;
pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg;
pub use crate::aggregate::average::Avg;
pub use crate::aggregate::average::AvgAccumulator;
-pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr};
pub use crate::aggregate::build_in::create_aggregate_expr;
pub use crate::aggregate::correlation::Correlation;
pub use crate::aggregate::grouping::Grouping;
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 6375df721a..50356d5b60 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -494,8 +494,8 @@ enum AggregateFunction {
// BIT_AND = 19;
// BIT_OR = 20;
// BIT_XOR = 21;
- BOOL_AND = 22;
- BOOL_OR = 23;
+// BOOL_AND = 22;
+// BOOL_OR = 23;
// REGR_SLOPE = 26;
// REGR_INTERCEPT = 27;
// REGR_COUNT = 28;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 5c483f70d1..8cca0fe4a8 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -538,8 +538,6 @@ impl serde::Serialize for AggregateFunction {
Self::ArrayAgg => "ARRAY_AGG",
Self::Correlation => "CORRELATION",
Self::Grouping => "GROUPING",
- Self::BoolAnd => "BOOL_AND",
- Self::BoolOr => "BOOL_OR",
Self::NthValueAgg => "NTH_VALUE_AGG",
};
serializer.serialize_str(variant)
@@ -558,8 +556,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"ARRAY_AGG",
"CORRELATION",
"GROUPING",
- "BOOL_AND",
- "BOOL_OR",
"NTH_VALUE_AGG",
];
@@ -607,8 +603,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg),
"CORRELATION" => Ok(AggregateFunction::Correlation),
"GROUPING" => Ok(AggregateFunction::Grouping),
- "BOOL_AND" => Ok(AggregateFunction::BoolAnd),
- "BOOL_OR" => Ok(AggregateFunction::BoolOr),
"NTH_VALUE_AGG" => Ok(AggregateFunction::NthValueAgg),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index bc5b6be2ad..56f1498292 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1948,8 +1948,8 @@ pub enum AggregateFunction {
/// BIT_AND = 19;
/// BIT_OR = 20;
/// BIT_XOR = 21;
- BoolAnd = 22,
- BoolOr = 23,
+ /// BOOL_AND = 22;
+ /// BOOL_OR = 23;
/// REGR_SLOPE = 26;
/// REGR_INTERCEPT = 27;
/// REGR_COUNT = 28;
@@ -1975,8 +1975,6 @@ impl AggregateFunction {
AggregateFunction::ArrayAgg => "ARRAY_AGG",
AggregateFunction::Correlation => "CORRELATION",
AggregateFunction::Grouping => "GROUPING",
- AggregateFunction::BoolAnd => "BOOL_AND",
- AggregateFunction::BoolOr => "BOOL_OR",
AggregateFunction::NthValueAgg => "NTH_VALUE_AGG",
}
}
@@ -1989,8 +1987,6 @@ impl AggregateFunction {
"ARRAY_AGG" => Some(Self::ArrayAgg),
"CORRELATION" => Some(Self::Correlation),
"GROUPING" => Some(Self::Grouping),
- "BOOL_AND" => Some(Self::BoolAnd),
- "BOOL_OR" => Some(Self::BoolOr),
"NTH_VALUE_AGG" => Some(Self::NthValueAgg),
_ => None,
}
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 5bec655bb1..ba0e708218 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -140,8 +140,6 @@ impl From<protobuf::AggregateFunction> for
AggregateFunction {
protobuf::AggregateFunction::Min => Self::Min,
protobuf::AggregateFunction::Max => Self::Max,
protobuf::AggregateFunction::Avg => Self::Avg,
- protobuf::AggregateFunction::BoolAnd => Self::BoolAnd,
- protobuf::AggregateFunction::BoolOr => Self::BoolOr,
protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
protobuf::AggregateFunction::Correlation => Self::Correlation,
protobuf::AggregateFunction::Grouping => Self::Grouping,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 66b7c77799..08999effa4 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -111,8 +111,6 @@ impl From<&AggregateFunction> for
protobuf::AggregateFunction {
AggregateFunction::Min => Self::Min,
AggregateFunction::Max => Self::Max,
AggregateFunction::Avg => Self::Avg,
- AggregateFunction::BoolAnd => Self::BoolAnd,
- AggregateFunction::BoolOr => Self::BoolOr,
AggregateFunction::ArrayAgg => Self::ArrayAgg,
AggregateFunction::Correlation => Self::Correlation,
AggregateFunction::Grouping => Self::Grouping,
@@ -376,8 +374,6 @@ pub fn serialize_expr(
AggregateFunction::ArrayAgg =>
protobuf::AggregateFunction::ArrayAgg,
AggregateFunction::Min => protobuf::AggregateFunction::Min,
AggregateFunction::Max => protobuf::AggregateFunction::Max,
- AggregateFunction::BoolAnd =>
protobuf::AggregateFunction::BoolAnd,
- AggregateFunction::BoolOr =>
protobuf::AggregateFunction::BoolOr,
AggregateFunction::Avg => protobuf::AggregateFunction::Avg,
AggregateFunction::Correlation => {
protobuf::AggregateFunction::Correlation
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index ed966509b8..a9d3736dee 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -23,10 +23,10 @@ use
datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::physical_expr::window::{NthValueKind,
SlidingAggregateWindowExpr};
use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
use datafusion::physical_plan::expressions::{
- ArrayAgg, Avg, BinaryExpr, BoolAnd, BoolOr, CaseExpr, CastExpr, Column,
Correlation,
- CumeDist, DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr,
IsNullExpr, Literal,
- Max, Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile,
- OrderSensitiveArrayAgg, Rank, RankType, RowNumber, TryCastExpr,
WindowShift,
+ ArrayAgg, Avg, BinaryExpr, CaseExpr, CastExpr, Column, Correlation,
CumeDist,
+ DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr, IsNullExpr,
Literal, Max, Min,
+ NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile,
OrderSensitiveArrayAgg, Rank,
+ RankType, RowNumber, TryCastExpr, WindowShift,
};
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::windows::{BuiltInWindowExpr,
PlainAggregateWindowExpr};
@@ -240,10 +240,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) ->
Result<AggrFn> {
let inner = if aggr_expr.downcast_ref::<Grouping>().is_some() {
protobuf::AggregateFunction::Grouping
- } else if aggr_expr.downcast_ref::<BoolAnd>().is_some() {
- protobuf::AggregateFunction::BoolAnd
- } else if aggr_expr.downcast_ref::<BoolOr>().is_some() {
- protobuf::AggregateFunction::BoolOr
} else if aggr_expr.downcast_ref::<ArrayAgg>().is_some() {
protobuf::AggregateFunction::ArrayAgg
} else if aggr_expr.downcast_ref::<DistinctArrayAgg>().is_some() {
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 61764394ee..b3966c3f02 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -59,7 +59,9 @@ use datafusion_expr::{
TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits,
WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
};
-use datafusion_functions_aggregate::expr_fn::{bit_and, bit_or, bit_xor};
+use datafusion_functions_aggregate::expr_fn::{
+ bit_and, bit_or, bit_xor, bool_and, bool_or,
+};
use datafusion_functions_aggregate::string_agg::string_agg;
use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec,
@@ -671,6 +673,8 @@ async fn roundtrip_expr_api() -> Result<()> {
bit_or(lit(2)),
bit_xor(lit(2)),
string_agg(col("a").cast_to(&DataType::Utf8, &schema)?, lit("|")),
+ bool_and(lit(true)),
+ bool_or(lit(true)),
];
// ensure expressions created with the expr api can be round tripped
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]