This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 17f069df4 Add `InList` support for timestamp type. (#3449) (#3450)
17f069df4 is described below

commit 17f069df4227b837cf2741a545c39a8b68d5fd76
Author: Yang Jiang <[email protected]>
AuthorDate: Tue Sep 13 04:09:18 2022 +0800

    Add `InList` support for timestamp type. (#3449) (#3450)
    
    * Add `InList` support for timestamp type. (#3449)
    
    * Update datafusion/physical-expr/src/expressions/in_list.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../physical-expr/src/expressions/in_list.rs       | 209 ++++++++++++++++++++-
 1 file changed, 203 insertions(+), 6 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/in_list.rs 
b/datafusion/physical-expr/src/expressions/in_list.rs
index fcaa3c3f2..184847316 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -24,8 +24,9 @@ use std::sync::Arc;
 use arrow::array::GenericStringArray;
 use arrow::array::{
     ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
-    Int64Array, Int8Array, OffsetSizeTrait, UInt16Array, UInt32Array, 
UInt64Array,
-    UInt8Array,
+    Int64Array, Int8Array, OffsetSizeTrait, TimestampMicrosecondArray,
+    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
+    UInt16Array, UInt32Array, UInt64Array, UInt8Array,
 };
 use arrow::{
     datatypes::{DataType, Schema},
@@ -34,10 +35,12 @@ use arrow::{
 
 use crate::PhysicalExpr;
 use arrow::array::*;
+use arrow::datatypes::TimeUnit;
 use datafusion_common::ScalarValue;
 use datafusion_common::ScalarValue::{
     Binary, Boolean, Date32, Date64, Decimal128, Int16, Int32, Int64, Int8, 
LargeBinary,
-    LargeUtf8, UInt16, UInt32, UInt64, UInt8, Utf8,
+    LargeUtf8, TimestampMicrosecond, TimestampMillisecond, TimestampNanosecond,
+    TimestampSecond, UInt16, UInt32, UInt64, UInt8, Utf8,
 };
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::ColumnarValue;
@@ -111,8 +114,8 @@ macro_rules! make_contains_primitive {
             .iter()
             .flat_map(|expr| match expr {
                 ColumnarValue::Scalar(s) => match s {
-                    ScalarValue::$SCALAR_VALUE(Some(v)) => Some(*v),
-                    ScalarValue::$SCALAR_VALUE(None) => None,
+                    ScalarValue::$SCALAR_VALUE(Some(v), ..) => Some(*v),
+                    ScalarValue::$SCALAR_VALUE(None, ..) => None,
                     datatype => unreachable!("InList can't reach other data 
type {} for {}.", datatype, s),
                 },
                 ColumnarValue::Array(_) => {
@@ -175,7 +178,7 @@ macro_rules! set_contains_for_primitive {
         let native_set = $SET_VALUES
             .iter()
             .flat_map(|v| match v {
-                $SCALAR_VALUE(value) => *value,
+                $SCALAR_VALUE(value, ..) => *value,
                 datatype => {
                     unreachable!(
                         "InList can't reach other data type {} for {}.",
@@ -691,6 +694,60 @@ impl PhysicalExpr for InListExpr {
                     let array = 
array.as_any().downcast_ref::<Decimal128Array>().unwrap();
                     Ok(make_set_contains_decimal(array, set, self.negated))
                 }
+                DataType::Timestamp(unit, _) => match unit {
+                    TimeUnit::Second => {
+                        let array = array
+                            .as_any()
+                            .downcast_ref::<TimestampSecondArray>()
+                            .unwrap();
+                        Ok(set_contains_for_primitive!(
+                            array,
+                            set,
+                            TimestampSecond,
+                            self.negated,
+                            i64
+                        ))
+                    }
+                    TimeUnit::Millisecond => {
+                        let array = array
+                            .as_any()
+                            .downcast_ref::<TimestampMillisecondArray>()
+                            .unwrap();
+                        Ok(set_contains_for_primitive!(
+                            array,
+                            set,
+                            TimestampMillisecond,
+                            self.negated,
+                            i64
+                        ))
+                    }
+                    TimeUnit::Microsecond => {
+                        let array = array
+                            .as_any()
+                            .downcast_ref::<TimestampMicrosecondArray>()
+                            .unwrap();
+                        Ok(set_contains_for_primitive!(
+                            array,
+                            set,
+                            TimestampMicrosecond,
+                            self.negated,
+                            i64
+                        ))
+                    }
+                    TimeUnit::Nanosecond => {
+                        let array = array
+                            .as_any()
+                            .downcast_ref::<TimestampNanosecondArray>()
+                            .unwrap();
+                        Ok(set_contains_for_primitive!(
+                            array,
+                            set,
+                            TimestampNanosecond,
+                            self.negated,
+                            i64
+                        ))
+                    }
+                },
                 datatype => 
Result::Err(DataFusionError::NotImplemented(format!(
                     "InSet does not support datatype {:?}.",
                     datatype
@@ -849,6 +906,44 @@ impl PhysicalExpr for InListExpr {
                         self.negated,
                     ))
                 }
+                DataType::Timestamp(unit, _) => match unit {
+                    TimeUnit::Second => {
+                        make_contains_primitive!(
+                            array,
+                            list_values,
+                            self.negated,
+                            TimestampSecond,
+                            TimestampSecondArray
+                        )
+                    }
+                    TimeUnit::Millisecond => {
+                        make_contains_primitive!(
+                            array,
+                            list_values,
+                            self.negated,
+                            TimestampMillisecond,
+                            TimestampMillisecondArray
+                        )
+                    }
+                    TimeUnit::Microsecond => {
+                        make_contains_primitive!(
+                            array,
+                            list_values,
+                            self.negated,
+                            TimestampMicrosecond,
+                            TimestampMicrosecondArray
+                        )
+                    }
+                    TimeUnit::Nanosecond => {
+                        make_contains_primitive!(
+                            array,
+                            list_values,
+                            self.negated,
+                            TimestampNanosecond,
+                            TimestampNanosecondArray
+                        )
+                    }
+                },
                 datatype => 
Result::Err(DataFusionError::NotImplemented(format!(
                     "InList does not support datatype {:?}.",
                     datatype
@@ -1659,4 +1754,106 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn in_list_set_timestamp() -> Result<()> {
+        let schema = Schema::new(vec![Field::new(
+            "a",
+            DataType::Timestamp(TimeUnit::Microsecond, None),
+            true,
+        )]);
+        let a = TimestampMicrosecondArray::from(vec![
+            Some(1388588401000000000),
+            Some(1288588501000000000),
+            None,
+        ]);
+        let col_a = col("a", &schema)?;
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(a)])?;
+
+        let mut list = vec![
+            lit(ScalarValue::TimestampMicrosecond(
+                Some(1388588401000000000),
+                None,
+            )),
+            lit(ScalarValue::TimestampMicrosecond(None, None)),
+            lit(ScalarValue::TimestampMicrosecond(
+                Some(1388588401000000001),
+                None,
+            )),
+        ];
+        let start_ts = 1388588401000000001;
+        for v in start_ts..(start_ts + OPTIMIZER_INSET_THRESHOLD + 4) {
+            list.push(lit(ScalarValue::TimestampMicrosecond(Some(v as i64), 
None)));
+        }
+
+        in_list!(
+            batch,
+            list.clone(),
+            &false,
+            vec![Some(true), None, None],
+            col_a.clone(),
+            &schema
+        );
+
+        in_list!(
+            batch,
+            list.clone(),
+            &true,
+            vec![Some(false), None, None],
+            col_a.clone(),
+            &schema
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn in_list_timestamp() -> Result<()> {
+        let schema = Schema::new(vec![Field::new(
+            "a",
+            DataType::Timestamp(TimeUnit::Microsecond, None),
+            true,
+        )]);
+        let a = TimestampMicrosecondArray::from(vec![
+            Some(1388588401000000000),
+            Some(1288588501000000000),
+            None,
+        ]);
+        let col_a = col("a", &schema)?;
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![Arc::new(a)])?;
+
+        let list = vec![
+            lit(ScalarValue::TimestampMicrosecond(
+                Some(1388588401000000000),
+                None,
+            )),
+            lit(ScalarValue::TimestampMicrosecond(
+                Some(1388588401000000001),
+                None,
+            )),
+            lit(ScalarValue::TimestampMicrosecond(
+                Some(1388588401000000002),
+                None,
+            )),
+        ];
+
+        in_list!(
+            batch,
+            list.clone(),
+            &false,
+            vec![Some(true), Some(false), None],
+            col_a.clone(),
+            &schema
+        );
+
+        in_list!(
+            batch,
+            list.clone(),
+            &true,
+            vec![Some(false), Some(true), None],
+            col_a.clone(),
+            &schema
+        );
+        Ok(())
+    }
 }

Reply via email to