This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a1281aa79e Deprecate ScalarValue::and, ScalarValue::or (#6842) (#6844)
a1281aa79e is described below
commit a1281aa79ea104efdf3d07dd1948f2ae0b374634
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Jul 6 14:18:41 2023 +0100
Deprecate ScalarValue::and, ScalarValue::or (#6842) (#6844)
* Deprecate ScalarValue::and, ScalarValue::or (#6842)
* Review feedback
---
datafusion/common/src/scalar.rs | 2 +
.../physical-expr/src/aggregate/bool_and_or.rs | 65 +++++++++-------------
2 files changed, 28 insertions(+), 39 deletions(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index b0769df1e9..2011247346 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -2109,11 +2109,13 @@ impl ScalarValue {
impl_checked_op!(self, rhs, checked_sub, -)
}
+ #[deprecated(note = "Use arrow kernels or specialization (#6842)")]
pub fn and<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue>
{
let rhs = other.borrow();
impl_op!(self, rhs, &&)
}
+ #[deprecated(note = "Use arrow kernels or specialization (#6842)")]
pub fn or<T: Borrow<ScalarValue>>(&self, other: T) -> Result<ScalarValue> {
let rhs = other.borrow();
impl_op!(self, rhs, ||)
diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs
b/datafusion/physical-expr/src/aggregate/bool_and_or.rs
index bbab4dfce6..e444dc61ee 100644
--- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs
+++ b/datafusion/physical-expr/src/aggregate/bool_and_or.rs
@@ -18,7 +18,6 @@
//! Defines physical expressions that can evaluated at runtime during query
execution
use std::any::Any;
-use std::convert::TryFrom;
use std::sync::Arc;
use crate::{AggregateExpr, PhysicalExpr};
@@ -161,7 +160,7 @@ impl AggregateExpr for BoolAnd {
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+ Ok(Box::<BoolAndAccumulator>::default())
}
fn state_fields(&self) -> Result<Vec<Field>> {
@@ -199,7 +198,7 @@ impl AggregateExpr for BoolAnd {
}
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(BoolAndAccumulator::try_new(&self.data_type)?))
+ Ok(Box::<BoolAndAccumulator>::default())
}
}
@@ -217,25 +216,20 @@ impl PartialEq<dyn Any> for BoolAnd {
}
}
-#[derive(Debug)]
+#[derive(Debug, Default)]
struct BoolAndAccumulator {
- bool_and: ScalarValue,
-}
-
-impl BoolAndAccumulator {
- /// new bool_and accumulator
- pub fn try_new(data_type: &DataType) -> Result<Self> {
- Ok(Self {
- bool_and: ScalarValue::try_from(data_type)?,
- })
- }
+ acc: Option<bool>,
}
impl Accumulator for BoolAndAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &values[0];
- let delta = &bool_and_batch(values)?;
- self.bool_and = self.bool_and.and(delta)?;
+ self.acc = match (self.acc, bool_and_batch(values)?) {
+ (None, ScalarValue::Boolean(v)) => v,
+ (Some(v), ScalarValue::Boolean(None)) => Some(v),
+ (Some(a), ScalarValue::Boolean(Some(b))) => Some(a && b),
+ _ => unreachable!(),
+ };
Ok(())
}
@@ -244,16 +238,15 @@ impl Accumulator for BoolAndAccumulator {
}
fn state(&self) -> Result<Vec<ScalarValue>> {
- Ok(vec![self.bool_and.clone()])
+ Ok(vec![ScalarValue::Boolean(self.acc)])
}
fn evaluate(&self) -> Result<ScalarValue> {
- Ok(self.bool_and.clone())
+ Ok(ScalarValue::Boolean(self.acc))
}
fn size(&self) -> usize {
- std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_and)
- + self.bool_and.size()
+ std::mem::size_of_val(self)
}
}
@@ -355,7 +348,7 @@ impl AggregateExpr for BoolOr {
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?))
+ Ok(Box::<BoolOrAccumulator>::default())
}
fn state_fields(&self) -> Result<Vec<Field>> {
@@ -393,7 +386,7 @@ impl AggregateExpr for BoolOr {
}
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(BoolOrAccumulator::try_new(&self.data_type)?))
+ Ok(Box::<BoolOrAccumulator>::default())
}
}
@@ -411,29 +404,24 @@ impl PartialEq<dyn Any> for BoolOr {
}
}
-#[derive(Debug)]
+#[derive(Debug, Default)]
struct BoolOrAccumulator {
- bool_or: ScalarValue,
-}
-
-impl BoolOrAccumulator {
- /// new bool_or accumulator
- pub fn try_new(data_type: &DataType) -> Result<Self> {
- Ok(Self {
- bool_or: ScalarValue::try_from(data_type)?,
- })
- }
+ acc: Option<bool>,
}
impl Accumulator for BoolOrAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
- Ok(vec![self.bool_or.clone()])
+ Ok(vec![ScalarValue::Boolean(self.acc)])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &values[0];
- let delta = bool_or_batch(values)?;
- self.bool_or = self.bool_or.or(&delta)?;
+ self.acc = match (self.acc, bool_or_batch(values)?) {
+ (None, ScalarValue::Boolean(v)) => v,
+ (Some(v), ScalarValue::Boolean(None)) => Some(v),
+ (Some(a), ScalarValue::Boolean(Some(b))) => Some(a || b),
+ _ => unreachable!(),
+ };
Ok(())
}
@@ -442,12 +430,11 @@ impl Accumulator for BoolOrAccumulator {
}
fn evaluate(&self) -> Result<ScalarValue> {
- Ok(self.bool_or.clone())
+ Ok(ScalarValue::Boolean(self.acc))
}
fn size(&self) -> usize {
- std::mem::size_of_val(self) - std::mem::size_of_val(&self.bool_or)
- + self.bool_or.size()
+ std::mem::size_of_val(self)
}
}