This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dfd8a8d  [SPARK-35341][PYTHON] Introduce BooleanExtensionOps
dfd8a8d is described below

commit dfd8a8dc676c388c0c1bb7e4cb8d55eab10504ab
Author: Xinrong Meng <xinrong.m...@databricks.com>
AuthorDate: Mon Jun 7 15:43:52 2021 -0700

    [SPARK-35341][PYTHON] Introduce BooleanExtensionOps
    
    ### What changes were proposed in this pull request?
    
    - Introduce BooleanExtensionOps in order to make boolean operators `and` 
and `or` data-type-based.
    - Improve error messages for operators `and` and `or`.
    
    ### Why are the changes needed?
    
    Boolean operators __and__, __or__, __rand__, and __ror__ should be 
data-type-based
    
    BooleanExtensionDtypes processes these boolean operators differently from 
bool, so BooleanExtensionOps is introduced.
    
    These boolean operators themselves are also bitwise operators, which should 
be able to apply to other data types classes later. However, this is not the 
goal of this PR.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Error messages for operators `and` and `or` are improved.
    Before:
    ```
    >>> psser = ps.Series([1, "x", "y"], dtype="category")
    >>> psser | True
    Traceback (most recent call last):
    ...
    pyspark.sql.utils.AnalysisException: cannot resolve '(`0` OR true)' due to 
data type mismatch: differing types in '(`0` OR true)' (tinyint and boolean).;
    'Project [unresolvedalias(CASE WHEN (isnull(0#9) OR isnull((0#9 OR true))) 
THEN false ELSE (0#9 OR true) END, 
Some(org.apache.spark.sql.Column$$Lambda$1442/17254916406fb8afba))]
    +- Project [__index_level_0__#8L, 0#9, monotonically_increasing_id() AS 
__natural_order__#12L]
       +- LogicalRDD [__index_level_0__#8L, 0#9], false
    
    ```
    
    After:
    ```
    >>> psser = ps.Series([1, "x", "y"], dtype="category")
    >>> psser | True
    Traceback (most recent call last):
    ...
    TypeError: Bitwise or can not be applied to categoricals.
    ```
    
    ### How was this patch tested?
    
    Unit tests.
    
    Closes #32698 from xinrong-databricks/datatypeops_extension.
    
    Authored-by: Xinrong Meng <xinrong.m...@databricks.com>
    Signed-off-by: Takuya UESHIN <ues...@databricks.com>
---
 python/pyspark/pandas/base.py                      |  56 +----
 python/pyspark/pandas/data_type_ops/base.py        |  28 ++-
 python/pyspark/pandas/data_type_ops/boolean_ops.py |  66 ++++-
 .../pandas/tests/data_type_ops/test_binary_ops.py  |  18 ++
 .../pandas/tests/data_type_ops/test_boolean_ops.py | 278 ++++++++++++++++++++-
 .../tests/data_type_ops/test_categorical_ops.py    |  18 ++
 .../pandas/tests/data_type_ops/test_complex_ops.py |  18 ++
 .../pandas/tests/data_type_ops/test_date_ops.py    |  18 ++
 .../tests/data_type_ops/test_datetime_ops.py       |  18 ++
 .../pandas/tests/data_type_ops/test_num_ops.py     |  22 ++
 .../pandas/tests/data_type_ops/test_string_ops.py  |  18 ++
 .../pandas/tests/data_type_ops/testing_utils.py    |   9 +
 12 files changed, 509 insertions(+), 58 deletions(-)

diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py
index 49d3cf3..dfae9a8 100644
--- a/python/pyspark/pandas/base.py
+++ b/python/pyspark/pandas/base.py
@@ -399,65 +399,21 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
     __ge__ = column_op(Column.__ge__)
     __gt__ = column_op(Column.__gt__)
 
+    __invert__ = column_op(Column.__invert__)
+
     # `and`, `or`, `not` cannot be overloaded in Python,
     # so use bitwise operators as boolean operators
     def __and__(self, other) -> Union["Series", "Index"]:
-        if isinstance(self.dtype, extension_dtypes) or (
-            isinstance(other, IndexOpsMixin) and isinstance(other.dtype, 
extension_dtypes)
-        ):
-
-            def and_func(left, right):
-                if not isinstance(right, spark.Column):
-                    if pd.isna(right):
-                        right = F.lit(None)
-                    else:
-                        right = F.lit(right)
-                return left & right
-
-        else:
-
-            def and_func(left, right):
-                if not isinstance(right, spark.Column):
-                    if pd.isna(right):
-                        right = F.lit(None)
-                    else:
-                        right = F.lit(right)
-                scol = left & right
-                return F.when(scol.isNull(), False).otherwise(scol)
-
-        return column_op(and_func)(self, other)
+        return self._dtype_op.__and__(self, other)
 
     def __or__(self, other) -> Union["Series", "Index"]:
-        if isinstance(self.dtype, extension_dtypes) or (
-            isinstance(other, IndexOpsMixin) and isinstance(other.dtype, 
extension_dtypes)
-        ):
-
-            def or_func(left, right):
-                if not isinstance(right, spark.Column):
-                    if pd.isna(right):
-                        right = F.lit(None)
-                    else:
-                        right = F.lit(right)
-                return left | right
-
-        else:
-
-            def or_func(left, right):
-                if not isinstance(right, spark.Column) and pd.isna(right):
-                    return F.lit(False)
-                else:
-                    scol = left | F.lit(right)
-                    return F.when(left.isNull() | scol.isNull(), 
False).otherwise(scol)
-
-        return column_op(or_func)(self, other)
-
-    __invert__ = column_op(Column.__invert__)
+        return self._dtype_op.__or__(self, other)
 
     def __rand__(self, other) -> Union["Series", "Index"]:
-        return self.__and__(other)
+        return self._dtype_op.rand(self, other)
 
     def __ror__(self, other) -> Union["Series", "Index"]:
-        return self.__or__(other)
+        return self._dtype_op.ror(self, other)
 
     def __len__(self):
         return len(self._psdf)
diff --git a/python/pyspark/pandas/data_type_ops/base.py 
b/python/pyspark/pandas/data_type_ops/base.py
index 1c25821..d15fd46 100644
--- a/python/pyspark/pandas/data_type_ops/base.py
+++ b/python/pyspark/pandas/data_type_ops/base.py
@@ -42,6 +42,10 @@ from pyspark.sql.types import (
 
 import pyspark.sql.types as types
 from pyspark.pandas.typedef import Dtype
+from pyspark.pandas.typedef.typehints import extension_object_dtypes_available
+
+if extension_object_dtypes_available:
+    from pandas import BooleanDtype
 
 if TYPE_CHECKING:
     from pyspark.pandas.indexes import Index  # noqa: F401 (SPARK-34943)
@@ -84,16 +88,13 @@ class DataTypeOps(object, metaclass=ABCMeta):
 
     def __new__(cls, dtype: Dtype, spark_type: DataType):
         from pyspark.pandas.data_type_ops.binary_ops import BinaryOps
-        from pyspark.pandas.data_type_ops.boolean_ops import BooleanOps
+        from pyspark.pandas.data_type_ops.boolean_ops import BooleanOps, 
BooleanExtensionOps
         from pyspark.pandas.data_type_ops.categorical_ops import CategoricalOps
         from pyspark.pandas.data_type_ops.complex_ops import ArrayOps, MapOps, 
StructOps
         from pyspark.pandas.data_type_ops.date_ops import DateOps
         from pyspark.pandas.data_type_ops.datetime_ops import DatetimeOps
         from pyspark.pandas.data_type_ops.null_ops import NullOps
-        from pyspark.pandas.data_type_ops.num_ops import (
-            IntegralOps,
-            FractionalOps,
-        )
+        from pyspark.pandas.data_type_ops.num_ops import IntegralOps, 
FractionalOps
         from pyspark.pandas.data_type_ops.string_ops import StringOps
         from pyspark.pandas.data_type_ops.udt_ops import UDTOps
 
@@ -106,7 +107,10 @@ class DataTypeOps(object, metaclass=ABCMeta):
         elif isinstance(spark_type, StringType):
             return object.__new__(StringOps)
         elif isinstance(spark_type, BooleanType):
-            return object.__new__(BooleanOps)
+            if extension_object_dtypes_available and isinstance(dtype, 
BooleanDtype):
+                return object.__new__(BooleanExtensionOps)
+            else:
+                return object.__new__(BooleanOps)
         elif isinstance(spark_type, TimestampType):
             return object.__new__(DatetimeOps)
         elif isinstance(spark_type, DateType):
@@ -176,6 +180,18 @@ class DataTypeOps(object, metaclass=ABCMeta):
     def rpow(self, left, right) -> Union["Series", "Index"]:
         raise TypeError("Exponentiation can not be applied to %s." % 
self.pretty_name)
 
+    def __and__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Bitwise and can not be applied to %s." % 
self.pretty_name)
+
+    def __or__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Bitwise or can not be applied to %s." % 
self.pretty_name)
+
+    def rand(self, left, right) -> Union["Series", "Index"]:
+        return left.__and__(right)
+
+    def ror(self, left, right) -> Union["Series", "Index"]:
+        return left.__or__(right)
+
     def restore(self, col: pd.Series) -> pd.Series:
         """Restore column when to_pandas."""
         return col
diff --git a/python/pyspark/pandas/data_type_ops/boolean_ops.py 
b/python/pyspark/pandas/data_type_ops/boolean_ops.py
index a93841f..75ec7fa 100644
--- a/python/pyspark/pandas/data_type_ops/boolean_ops.py
+++ b/python/pyspark/pandas/data_type_ops/boolean_ops.py
@@ -18,13 +18,18 @@
 import numbers
 from typing import TYPE_CHECKING, Union
 
-from pyspark.pandas.base import IndexOpsMixin
+import pandas as pd
+
+from pyspark import sql as spark
+from pyspark.pandas.base import column_op, IndexOpsMixin
 from pyspark.pandas.data_type_ops.base import (
     is_valid_operand_for_numeric_arithmetic,
     DataTypeOps,
     transform_boolean_operand_to_numeric,
 )
+from pyspark.pandas.typedef import extension_dtypes
 from pyspark.pandas.typedef.typehints import as_spark_type
+from pyspark.sql import functions as F
 
 if TYPE_CHECKING:
     from pyspark.pandas.indexes import Index  # noqa: F401 (SPARK-34943)
@@ -194,3 +199,62 @@ class BooleanOps(DataTypeOps):
             raise TypeError(
                 "Modulo can not be applied to %s and the given type." % 
self.pretty_name
             )
+
+    def __and__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, IndexOpsMixin) and isinstance(right.dtype, 
extension_dtypes):
+            return right.__and__(left)
+        else:
+
+            def and_func(left, right):
+                if not isinstance(right, spark.Column):
+                    if pd.isna(right):
+                        right = F.lit(None)
+                    else:
+                        right = F.lit(right)
+                scol = left & right
+                return F.when(scol.isNull(), False).otherwise(scol)
+
+            return column_op(and_func)(left, right)
+
+    def __or__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, IndexOpsMixin) and isinstance(right.dtype, 
extension_dtypes):
+            return right.__or__(left)
+        else:
+
+            def or_func(left, right):
+                if not isinstance(right, spark.Column) and pd.isna(right):
+                    return F.lit(False)
+                else:
+                    scol = left | F.lit(right)
+                    return F.when(left.isNull() | scol.isNull(), 
False).otherwise(scol)
+
+            return column_op(or_func)(left, right)
+
+
+class BooleanExtensionOps(BooleanOps):
+    """
+    The class for binary operations of pandas-on-Spark objects with spark type 
BooleanType,
+    and dtype BooleanDtype.
+    """
+
+    def __and__(self, left, right) -> Union["Series", "Index"]:
+        def and_func(left, right):
+            if not isinstance(right, spark.Column):
+                if pd.isna(right):
+                    right = F.lit(None)
+                else:
+                    right = F.lit(right)
+            return left & right
+
+        return column_op(and_func)(left, right)
+
+    def __or__(self, left, right) -> Union["Series", "Index"]:
+        def or_func(left, right):
+            if not isinstance(right, spark.Column):
+                if pd.isna(right):
+                    right = F.lit(None)
+                else:
+                    right = F.lit(right)
+            return left | right
+
+        return column_op(or_func)(left, right)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_binary_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_binary_ops.py
index e047a5c..12e3eb2 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_binary_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_binary_ops.py
@@ -122,6 +122,24 @@ class BinaryOpsTest(PandasOnSparkTestCase, TestCasesUtils):
         self.assertRaises(TypeError, lambda: "x" ** self.psser)
         self.assertRaises(TypeError, lambda: 1 ** self.psser)
 
+    def test_and(self):
+        self.assertRaises(TypeError, lambda: self.psser & True)
+        self.assertRaises(TypeError, lambda: self.psser & False)
+        self.assertRaises(TypeError, lambda: self.psser & self.psser)
+
+    def test_rand(self):
+        self.assertRaises(TypeError, lambda: True & self.psser)
+        self.assertRaises(TypeError, lambda: False & self.psser)
+
+    def test_or(self):
+        self.assertRaises(TypeError, lambda: self.psser | True)
+        self.assertRaises(TypeError, lambda: self.psser | False)
+        self.assertRaises(TypeError, lambda: self.psser | self.psser)
+
+    def test_ror(self):
+        self.assertRaises(TypeError, lambda: True | self.psser)
+        self.assertRaises(TypeError, lambda: False | self.psser)
+
     def test_from_to_pandas(self):
         data = [b"1", b"2", b"3"]
         pser = pd.Series(data)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
index 7f3ffdb..d43e4c9 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
@@ -16,6 +16,7 @@
 #
 
 import datetime
+import unittest
 from distutils.version import LooseVersion
 
 import pandas as pd
@@ -24,6 +25,7 @@ import numpy as np
 from pyspark import pandas as ps
 from pyspark.pandas.config import option_context
 from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils
+from pyspark.pandas.typedef.typehints import extension_dtypes_available
 from pyspark.testing.pandasutils import PandasOnSparkTestCase
 
 
@@ -229,6 +231,281 @@ class BooleanOpsTest(PandasOnSparkTestCase, 
TestCasesUtils):
         self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) % 
self.psser)
         self.assertRaises(TypeError, lambda: True % self.psser)
 
+    def test_and(self):
+        pser = pd.Series([True, False, None], dtype="bool")
+        psser = ps.from_pandas(pser)
+        self.assert_eq(pser & True, psser & True)
+        self.assert_eq(pser & False, psser & False)
+        self.assert_eq(pser & pser, psser & psser)
+
+        other_pser = pd.Series([False, None, True], dtype="bool")
+        other_psser = ps.from_pandas(other_pser)
+        with option_context("compute.ops_on_diff_frames", True):
+            self.assert_eq(pser & other_pser, psser & other_psser)
+            self.check_extension(
+                pser & other_pser.astype("boolean"), psser & 
other_psser.astype("boolean")
+            )
+            self.assert_eq(other_pser & pser, other_psser & psser)
+
+    def test_rand(self):
+        pser = pd.Series([True, False, None], dtype="bool")
+        psser = ps.from_pandas(pser)
+        self.assert_eq(True & pser, True & psser)
+        self.assert_eq(False & pser, False & psser)
+
+    def test_or(self):
+        pser = pd.Series([True, False, None], dtype="bool")
+        psser = ps.from_pandas(pser)
+        self.assert_eq(pser | True, psser | True)
+        self.assert_eq(pser | False, psser | False)
+        self.assert_eq(pser | pser, psser | psser)
+        self.assert_eq(True | pser, True | psser)
+        self.assert_eq(False | pser, False | psser)
+
+        other_pser = pd.Series([False, None, True], dtype="bool")
+        other_psser = ps.from_pandas(other_pser)
+        with option_context("compute.ops_on_diff_frames", True):
+            self.assert_eq(pser | other_pser, psser | other_psser)
+            self.check_extension(
+                pser | other_pser.astype("boolean"), psser | 
other_psser.astype("boolean")
+            )
+            self.assert_eq(other_pser | pser, other_psser | psser)
+
+    def test_ror(self):
+        pser = pd.Series([True, False, None], dtype="bool")
+        psser = ps.from_pandas(pser)
+        self.assert_eq(True | pser, True | psser)
+        self.assert_eq(False | pser, False | psser)
+
+
+@unittest.skipIf(not extension_dtypes_available, "pandas extension dtypes are 
not available")
+class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
+    @property
+    def pser(self):
+        return pd.Series([True, False, None], dtype="boolean")
+
+    @property
+    def psser(self):
+        return ps.from_pandas(self.pser)
+
+    @property
+    def other_pser(self):
+        return pd.Series([False, None, True], dtype="boolean")
+
+    @property
+    def other_psser(self):
+        return ps.from_pandas(self.other_pser)
+
+    @property
+    def float_pser(self):
+        return pd.Series([1, 2, 3], dtype=float)
+
+    @property
+    def float_psser(self):
+        return ps.from_pandas(self.float_pser)
+
+    def test_add(self):
+        pser = self.pser
+        psser = self.psser
+        self.assert_eq((pser + 1).astype(float), psser + 1)
+        self.assert_eq((pser + 0.1).astype(float), psser + 0.1)
+        self.assertRaises(TypeError, lambda: psser + psser)
+        self.assertRaises(TypeError, lambda: psser + True)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, psser in self.numeric_pser_psser_pairs:
+                self.assert_eq(self.pser + pser, (self.psser + 
psser).sort_index(), almost=True)
+            for psser in self.non_numeric_pssers.values():
+                self.assertRaises(TypeError, lambda: self.psser + psser)
+
+    def test_sub(self):
+        pser = self.pser
+        psser = self.psser
+        self.assert_eq((pser - 1).astype(float), psser - 1)
+        self.assert_eq((pser - 0.1).astype(float), psser - 0.1)
+        self.assertRaises(TypeError, lambda: psser - psser)
+        self.assertRaises(TypeError, lambda: psser - True)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, psser in self.numeric_pser_psser_pairs:
+                self.assert_eq(self.pser - pser, (self.psser - 
psser).sort_index(), almost=True)
+            for psser in self.non_numeric_pssers.values():
+                self.assertRaises(TypeError, lambda: self.psser - psser)
+
+    def test_mul(self):
+        pser = self.pser
+        psser = self.psser
+        self.assert_eq((pser * 1).astype(float), psser * 1)
+        self.assert_eq((pser * 0.1).astype(float), psser * 0.1)
+        self.assertRaises(TypeError, lambda: psser * psser)
+        self.assertRaises(TypeError, lambda: psser * True)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, psser in self.numeric_pser_psser_pairs:
+                self.assert_eq(self.pser * pser, (self.psser * 
psser).sort_index(), almost=True)
+            for psser in self.non_numeric_pssers.values():
+                self.assertRaises(TypeError, lambda: self.psser * psser)
+
+    def test_truediv(self):
+        pser = self.pser
+        psser = self.psser
+        self.assert_eq((pser / 1).astype(float), psser / 1)
+        self.assert_eq((pser / 0.1).astype(float), psser / 0.1)
+        self.assertRaises(TypeError, lambda: psser / psser)
+        self.assertRaises(TypeError, lambda: psser / True)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            self.assert_eq(
+                self.pser / self.float_pser,
+                (self.psser / self.float_psser).sort_index(),
+                almost=True,
+            )
+            for psser in self.non_numeric_pssers.values():
+                self.assertRaises(TypeError, lambda: self.psser / psser)
+
+    def test_floordiv(self):
+        pser = self.pser
+        psser = self.psser
+
+        # float is always returned in pandas-on-Spark
+        self.assert_eq((pser // 1).astype("float"), psser // 1)
+
+        # in pandas, 1 // 0.1 = 9.0; in pandas-on-Spark, 1 // 0.1 = 10.0
+        # self.assert_eq(pser // 0.1, psser // 0.1)
+
+        self.assertRaises(TypeError, lambda: psser // psser)
+        self.assertRaises(TypeError, lambda: psser // True)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            self.assert_eq(
+                self.pser // self.float_pser,
+                (self.psser // self.float_psser).sort_index(),
+                almost=True,
+            )
+            for psser in self.non_numeric_pssers.values():
+                self.assertRaises(TypeError, lambda: self.psser // psser)
+
+    def test_mod(self):
+        pser = self.pser
+        psser = self.psser
+        self.assert_eq((pser % 1).astype(float), psser % 1)
+        self.assert_eq((pser % 0.1).astype(float), psser % 0.1)
+        self.assertRaises(TypeError, lambda: psser % psser)
+        self.assertRaises(TypeError, lambda: psser % True)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, psser in self.numeric_pser_psser_pairs:
+                self.assert_eq(self.pser % pser, (self.psser % 
psser).sort_index(), almost=True)
+            for psser in self.non_numeric_pssers.values():
+                self.assertRaises(TypeError, lambda: self.psser % psser)
+
+    def test_pow(self):
+        pser = self.pser
+        psser = self.psser
+        # float is always returned in pandas-on-Spark
+        self.assert_eq((pser ** 1).astype("float"), psser ** 1)
+        self.assert_eq((pser ** 0.1).astype("float"), self.psser ** 0.1)
+        self.assert_eq((pser ** pser.astype(float)).astype("float"), psser ** 
psser.astype(float))
+        self.assertRaises(TypeError, lambda: psser ** psser)
+        self.assertRaises(TypeError, lambda: psser ** True)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            self.assert_eq(
+                self.pser ** self.float_pser,
+                (self.psser ** self.float_psser).sort_index(),
+                almost=True,
+            )
+
+            for psser in self.non_numeric_pssers.values():
+                self.assertRaises(TypeError, lambda: self.psser ** psser)
+
+    def test_radd(self):
+        self.assert_eq((1 + self.pser).astype(float), 1 + self.psser)
+        self.assert_eq((0.1 + self.pser).astype(float), 0.1 + self.psser)
+        self.assertRaises(TypeError, lambda: "x" + self.psser)
+        self.assertRaises(TypeError, lambda: True + self.psser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) + 
self.psser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) + 
self.psser)
+
+    def test_rsub(self):
+        self.assert_eq((1 - self.pser).astype(float), 1 - self.psser)
+        self.assert_eq((0.1 - self.pser).astype(float), 0.1 - self.psser)
+        self.assertRaises(TypeError, lambda: "x" - self.psser)
+        self.assertRaises(TypeError, lambda: True - self.psser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) - 
self.psser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) - 
self.psser)
+
+    def test_rmul(self):
+        self.assert_eq((1 * self.pser).astype(float), 1 * self.psser)
+        self.assert_eq((0.1 * self.pser).astype(float), 0.1 * self.psser)
+        self.assertRaises(TypeError, lambda: "x" * self.psser)
+        self.assertRaises(TypeError, lambda: True * self.psser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) * 
self.psser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) * 
self.psser)
+
+    def test_rtruediv(self):
+        self.assert_eq((1 / self.pser).astype(float), 1 / self.psser)
+        self.assert_eq((0.1 / self.pser).astype(float), 0.1 / self.psser)
+        self.assertRaises(TypeError, lambda: "x" / self.psser)
+        self.assertRaises(TypeError, lambda: True / self.psser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) / 
self.psser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) / 
self.psser)
+
+    def test_rfloordiv(self):
+        self.assert_eq((1 // self.psser).astype(float), ps.Series([1.0, 
np.inf, np.nan]))
+        self.assert_eq((0.1 // self.psser).astype(float), ps.Series([0.0, 
np.inf, np.nan]))
+        self.assertRaises(TypeError, lambda: "x" + self.psser)
+        self.assertRaises(TypeError, lambda: True + self.psser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) // 
self.psser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) // 
self.psser)
+
+    def test_rpow(self):
+        self.assert_eq(1 ** self.psser, ps.Series([1, 1, 1], dtype=float))
+        self.assert_eq((0.1 ** self.pser).astype(float), 0.1 ** self.psser)
+        self.assertRaises(TypeError, lambda: "x" ** self.psser)
+        self.assertRaises(TypeError, lambda: True ** self.psser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) ** 
self.psser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) ** 
self.psser)
+
+    def test_rmod(self):
+        self.assert_eq(ps.Series([0, np.nan, np.nan], dtype=float), 1 % 
self.psser)
+        self.assert_eq(
+            ps.Series([0.10000000000000009, np.nan, np.nan], dtype=float),
+            0.1 % self.psser,
+        )
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) % 
self.psser)
+        self.assertRaises(TypeError, lambda: True % self.psser)
+
+    def test_and(self):
+        pser = self.pser
+        psser = self.psser
+        self.check_extension(pser & True, psser & True)
+        self.check_extension(pser & False, psser & False)
+        self.check_extension(pser & pser, psser & psser)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            self.check_extension(pser & self.other_pser, psser & 
self.other_psser)
+            self.check_extension(self.other_pser & pser, self.other_psser & 
psser)
+
+    def test_rand(self):
+        self.check_extension(True & self.pser, True & self.psser)
+        self.check_extension(False & self.pser, False & self.psser)
+
+    def test_or(self):
+        pser = self.pser
+        psser = self.psser
+        self.check_extension(pser | True, psser | True)
+        self.check_extension(pser | False, psser | False)
+        self.check_extension(pser | pser, psser | psser)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            self.check_extension(pser | self.other_pser, psser | 
self.other_psser)
+            self.check_extension(self.other_pser | pser, self.other_psser | 
psser)
+
+    def test_ror(self):
+        self.check_extension(True | self.pser, True | self.psser)
+        self.check_extension(False | self.pser, False | self.psser)
+
     def test_from_to_pandas(self):
         data = [True, True, False]
         pser = pd.Series(data)
@@ -238,7 +515,6 @@ class BooleanOpsTest(PandasOnSparkTestCase, TestCasesUtils):
 
 
 if __name__ == "__main__":
-    import unittest
     from pyspark.pandas.tests.data_type_ops.test_boolean_ops import *  # noqa: 
F401
 
     try:
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py
index 85b2d6d..a5ed1bb 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py
@@ -115,6 +115,24 @@ class CategoricalOpsTest(PandasOnSparkTestCase, 
TestCasesUtils):
         self.assertRaises(TypeError, lambda: "x" ** self.psser)
         self.assertRaises(TypeError, lambda: 1 ** self.psser)
 
+    def test_and(self):
+        self.assertRaises(TypeError, lambda: self.psser & True)
+        self.assertRaises(TypeError, lambda: self.psser & False)
+        self.assertRaises(TypeError, lambda: self.psser & self.psser)
+
+    def test_rand(self):
+        self.assertRaises(TypeError, lambda: True & self.psser)
+        self.assertRaises(TypeError, lambda: False & self.psser)
+
+    def test_or(self):
+        self.assertRaises(TypeError, lambda: self.psser | True)
+        self.assertRaises(TypeError, lambda: self.psser | False)
+        self.assertRaises(TypeError, lambda: self.psser | self.psser)
+
+    def test_ror(self):
+        self.assertRaises(TypeError, lambda: True | self.psser)
+        self.assertRaises(TypeError, lambda: False | self.psser)
+
     def test_from_to_pandas(self):
         data = [1, "x", "y"]
         pser = pd.Series(data, dtype="category")
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py
index 9e3dce7..b2902cb 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py
@@ -190,6 +190,24 @@ class ComplexOpsTest(PandasOnSparkTestCase, 
TestCasesUtils):
         self.assertRaises(TypeError, lambda: "x" ** self.psser)
         self.assertRaises(TypeError, lambda: 1 ** self.psser)
 
+    def test_and(self):
+        self.assertRaises(TypeError, lambda: self.psser & True)
+        self.assertRaises(TypeError, lambda: self.psser & False)
+        self.assertRaises(TypeError, lambda: self.psser & self.psser)
+
+    def test_rand(self):
+        self.assertRaises(TypeError, lambda: True & self.psser)
+        self.assertRaises(TypeError, lambda: False & self.psser)
+
+    def test_or(self):
+        self.assertRaises(TypeError, lambda: self.psser | True)
+        self.assertRaises(TypeError, lambda: self.psser | False)
+        self.assertRaises(TypeError, lambda: self.psser | self.psser)
+
+    def test_ror(self):
+        self.assertRaises(TypeError, lambda: True | self.psser)
+        self.assertRaises(TypeError, lambda: False | self.psser)
+
     def test_from_to_pandas(self):
         for pser, psser in zip(self.psers, self.pssers):
             self.assert_eq(pser, psser.to_pandas())
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
index 656b290..3d94253 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
@@ -147,6 +147,24 @@ class DateOpsTest(PandasOnSparkTestCase, TestCasesUtils):
         self.assertRaises(TypeError, lambda: 1 ** self.psser)
         self.assertRaises(TypeError, lambda: self.some_date ** self.psser)
 
+    def test_and(self):
+        self.assertRaises(TypeError, lambda: self.psser & True)
+        self.assertRaises(TypeError, lambda: self.psser & False)
+        self.assertRaises(TypeError, lambda: self.psser & self.psser)
+
+    def test_rand(self):
+        self.assertRaises(TypeError, lambda: True & self.psser)
+        self.assertRaises(TypeError, lambda: False & self.psser)
+
+    def test_or(self):
+        self.assertRaises(TypeError, lambda: self.psser | True)
+        self.assertRaises(TypeError, lambda: self.psser | False)
+        self.assertRaises(TypeError, lambda: self.psser | self.psser)
+
+    def test_ror(self):
+        self.assertRaises(TypeError, lambda: True | self.psser)
+        self.assertRaises(TypeError, lambda: False | self.psser)
+
     def test_from_to_pandas(self):
         data = [datetime.date(1994, 1, 31), datetime.date(1994, 2, 1), 
datetime.date(1994, 2, 2)]
         pser = pd.Series(data)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py
index 9ea8cab..e50e017 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py
@@ -147,6 +147,24 @@ class DatetimeOpsTest(PandasOnSparkTestCase, 
TestCasesUtils):
         self.assertRaises(TypeError, lambda: 1 ** self.psser)
         self.assertRaises(TypeError, lambda: self.some_datetime ** self.psser)
 
+    def test_and(self):
+        self.assertRaises(TypeError, lambda: self.psser & True)
+        self.assertRaises(TypeError, lambda: self.psser & False)
+        self.assertRaises(TypeError, lambda: self.psser & self.psser)
+
+    def test_rand(self):
+        self.assertRaises(TypeError, lambda: True & self.psser)
+        self.assertRaises(TypeError, lambda: False & self.psser)
+
+    def test_or(self):
+        self.assertRaises(TypeError, lambda: self.psser | True)
+        self.assertRaises(TypeError, lambda: self.psser | False)
+        self.assertRaises(TypeError, lambda: self.psser | self.psser)
+
+    def test_ror(self):
+        self.assertRaises(TypeError, lambda: True | self.psser)
+        self.assertRaises(TypeError, lambda: False | self.psser)
+
     def test_from_to_pandas(self):
         data = pd.date_range("1994-1-31 10:30:15", periods=3, freq="M")
         pser = pd.Series(data)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
index 91d9a07..8076e74 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
@@ -254,6 +254,28 @@ class NumOpsTest(PandasOnSparkTestCase, TestCasesUtils):
             self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) % 
psser)
             self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) 
% psser)
 
+    def test_and(self):
+        psser = self.numeric_pssers[0]
+        self.assertRaises(TypeError, lambda: psser & True)
+        self.assertRaises(TypeError, lambda: psser & False)
+        self.assertRaises(TypeError, lambda: psser & psser)
+
+    def test_rand(self):
+        psser = self.numeric_pssers[0]
+        self.assertRaises(TypeError, lambda: True & psser)
+        self.assertRaises(TypeError, lambda: False & psser)
+
+    def test_or(self):
+        psser = self.numeric_pssers[0]
+        self.assertRaises(TypeError, lambda: psser | True)
+        self.assertRaises(TypeError, lambda: psser | False)
+        self.assertRaises(TypeError, lambda: psser | psser)
+
+    def test_ror(self):
+        psser = self.numeric_pssers[0]
+        self.assertRaises(TypeError, lambda: True | psser)
+        self.assertRaises(TypeError, lambda: False | psser)
+
     def test_from_to_pandas(self):
         for pser, psser in self.numeric_pser_psser_pairs:
             self.assert_eq(pser, psser.to_pandas())
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py
index a75d9f0..62f9406d 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py
@@ -129,6 +129,24 @@ class StringOpsTest(PandasOnSparkTestCase, TestCasesUtils):
         self.assertRaises(TypeError, lambda: "x" ** self.psser)
         self.assertRaises(TypeError, lambda: 1 ** self.psser)
 
+    def test_and(self):
+        self.assertRaises(TypeError, lambda: self.psser & True)
+        self.assertRaises(TypeError, lambda: self.psser & False)
+        self.assertRaises(TypeError, lambda: self.psser & self.psser)
+
+    def test_rand(self):
+        self.assertRaises(TypeError, lambda: True & self.psser)
+        self.assertRaises(TypeError, lambda: False & self.psser)
+
+    def test_or(self):
+        self.assertRaises(TypeError, lambda: self.psser | True)
+        self.assertRaises(TypeError, lambda: self.psser | False)
+        self.assertRaises(TypeError, lambda: self.psser | self.psser)
+
+    def test_ror(self):
+        self.assertRaises(TypeError, lambda: True | self.psser)
+        self.assertRaises(TypeError, lambda: False | self.psser)
+
     def test_from_to_pandas(self):
         data = ["x", "y", "z"]
         pser = pd.Series(data)
diff --git a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py 
b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
index 13fda52..df62fa2 100644
--- a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
+++ b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
@@ -17,11 +17,13 @@
 
 import datetime
 import decimal
+from distutils.version import LooseVersion
 
 import numpy as np
 import pandas as pd
 
 import pyspark.pandas as ps
+from pyspark.pandas.typedef import extension_dtypes
 
 
 class TestCasesUtils(object):
@@ -74,3 +76,10 @@ class TestCasesUtils(object):
     @property
     def pser_psser_pairs(self):
         return zip(self.psers, self.pssers)
+
+    def check_extension(self, psser, pser):
+        if LooseVersion("1.1") <= LooseVersion(pd.__version__) < 
LooseVersion("1.2.2"):
+            self.assert_eq(psser, pser, check_exact=False)
+            self.assertTrue(isinstance(psser.dtype, extension_dtypes))
+        else:
+            self.assert_eq(psser, pser)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to