This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d1b24d8  [SPARK-35338][PYTHON] Separate arithmetic operations into 
data type based structures
d1b24d8 is described below

commit d1b24d8aba8317c62542a81ca55c12700a07cb80
Author: Xinrong Meng <xinrong.m...@databricks.com>
AuthorDate: Wed May 19 15:05:32 2021 -0700

    [SPARK-35338][PYTHON] Separate arithmetic operations into data type based 
structures
    
    ### What changes were proposed in this pull request?
    
    The PR is proposed for **pandas APIs on Spark**, in order to separate 
arithmetic operations shown as below into data-type-based structures.
    `__add__, __sub__, __mul__, __truediv__, __floordiv__, __pow__, __mod__,
    __radd__, __rsub__, __rmul__, __rtruediv__, __rfloordiv__, 
__rpow__,__rmod__`
    
    DataTypeOps and subclasses are introduced.
    
    The existing behaviors of each arithmetic operation should be preserved.
    
    ### Why are the changes needed?
    
    Currently, the same arithmetic operation of all data types is defined in 
one function, so it’s difficult to extend the behavior change based on the data 
types.
    
    Introducing DataTypeOps would be the foundation for [pandas APIs on Spark: 
Separate basic operations into data type based 
structures.](https://docs.google.com/document/d/12MS6xK0hETYmrcl5b9pX5lgV4FmGVfpmcSKq--_oQlc/edit?usp=sharing).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Tests are introduced under pyspark.pandas.tests.data_type_ops. One test 
file per DataTypeOps class.
    
    Closes #32469 from xinrong-databricks/datatypeop_arith.
    
    Authored-by: Xinrong Meng <xinrong.m...@databricks.com>
    Signed-off-by: Takuya UESHIN <ues...@databricks.com>
---
 dev/sparktestsupport/modules.py                    |   6 +
 python/pyspark/pandas/base.py                      | 268 ++-------------
 python/pyspark/pandas/data_type_ops/__init__.py    |  16 +
 python/pyspark/pandas/data_type_ops/base.py        | 120 +++++++
 python/pyspark/pandas/data_type_ops/boolean_ops.py |  28 ++
 .../pandas/data_type_ops/categorical_ops.py        |  28 ++
 python/pyspark/pandas/data_type_ops/date_ops.py    |  71 ++++
 .../pyspark/pandas/data_type_ops/datetime_ops.py   |  72 ++++
 python/pyspark/pandas/data_type_ops/num_ops.py     | 378 +++++++++++++++++++++
 python/pyspark/pandas/data_type_ops/string_ops.py  | 104 ++++++
 .../pyspark/pandas/tests/data_type_ops/__init__.py |  16 +
 .../pandas/tests/data_type_ops/test_boolean_ops.py | 150 ++++++++
 .../tests/data_type_ops/test_categorical_ops.py    | 128 +++++++
 .../pandas/tests/data_type_ops/test_date_ops.py    | 158 +++++++++
 .../tests/data_type_ops/test_datetime_ops.py       | 160 +++++++++
 .../pandas/tests/data_type_ops/test_num_ops.py     | 195 +++++++++++
 .../pandas/tests/data_type_ops/test_string_ops.py  | 140 ++++++++
 .../pandas/tests/data_type_ops/testing_utils.py    |  75 ++++
 .../pyspark/pandas/tests/indexes/test_datetime.py  |  10 +-
 python/pyspark/pandas/tests/test_dataframe.py      |   4 +-
 .../pyspark/pandas/tests/test_series_datetime.py   |  10 +-
 python/pyspark/testing/pandasutils.py              |   2 +-
 python/setup.py                                    |   1 +
 23 files changed, 1881 insertions(+), 259 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index ab65ccd..c35618e 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -611,6 +611,12 @@ pyspark_pandas = Module(
         "pyspark.pandas.spark.utils",
         "pyspark.pandas.typedef.typehints",
         # unittests
+        "pyspark.pandas.tests.data_type_ops.test_boolean_ops",
+        "pyspark.pandas.tests.data_type_ops.test_categorical_ops",
+        "pyspark.pandas.tests.data_type_ops.test_date_ops",
+        "pyspark.pandas.tests.data_type_ops.test_datetime_ops",
+        "pyspark.pandas.tests.data_type_ops.test_num_ops",
+        "pyspark.pandas.tests.data_type_ops.test_string_ops",
         "pyspark.pandas.tests.indexes.test_base",
         "pyspark.pandas.tests.indexes.test_category",
         "pyspark.pandas.tests.indexes.test_datetime",
diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py
index 1082052..6cecb73 100644
--- a/python/pyspark/pandas/base.py
+++ b/python/pyspark/pandas/base.py
@@ -19,7 +19,6 @@
 Base and utility classes for pandas-on-Spark objects.
 """
 from abc import ABCMeta, abstractmethod
-import datetime
 from functools import wraps, partial
 from itertools import chain
 from typing import Any, Callable, Optional, Tuple, Union, cast, TYPE_CHECKING
@@ -35,7 +34,6 @@ from pyspark.sql.types import (
     DateType,
     DoubleType,
     FloatType,
-    IntegralType,
     LongType,
     NumericType,
     StringType,
@@ -50,11 +48,9 @@ from pyspark.pandas.internal import (
     NATURAL_ORDER_COLUMN_NAME,
     SPARK_DEFAULT_INDEX_NAME,
 )
-from pyspark.pandas.spark import functions as SF
 from pyspark.pandas.spark.accessors import SparkIndexOpsMethods
 from pyspark.pandas.typedef import (
     Dtype,
-    as_spark_type,
     extension_dtypes,
     pandas_on_spark_type,
     spark_type_to_pandas_dtype,
@@ -322,100 +318,23 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
 
     spark_column.__doc__ = SparkIndexOpsMethods.column.__doc__
 
+    @property
+    def _dtype_op(self):
+        from pyspark.pandas.data_type_ops.base import DataTypeOps
+
+        return DataTypeOps(self.dtype, self.spark.data_type)
+
     # arithmetic operators
     __neg__ = column_op(Column.__neg__)
 
     def __add__(self, other) -> Union["Series", "Index"]:
-        if not isinstance(self.spark.data_type, StringType) and (
-            (isinstance(other, IndexOpsMixin) and 
isinstance(other.spark.data_type, StringType))
-            or isinstance(other, str)
-        ):
-            raise TypeError("string addition can only be applied to string 
series or literals.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            raise TypeError("addition can not be applied to date times.")
-
-        if isinstance(self.spark.data_type, StringType):
-            # Concatenate string columns
-            if isinstance(other, IndexOpsMixin) and 
isinstance(other.spark.data_type, StringType):
-                return column_op(F.concat)(self, other)
-            # Handle df['col'] + 'literal'
-            elif isinstance(other, str):
-                return column_op(F.concat)(self, F.lit(other))
-            else:
-                raise TypeError("string addition can only be applied to string 
series or literals.")
-        else:
-            return column_op(Column.__add__)(self, other)
+        return self._dtype_op.__add__(self, other)
 
     def __sub__(self, other) -> Union["Series", "Index"]:
-        if (
-            isinstance(self.spark.data_type, StringType)
-            or (isinstance(other, IndexOpsMixin) and 
isinstance(other.spark.data_type, StringType))
-            or isinstance(other, str)
-        ):
-            raise TypeError("substraction can not be applied to string series 
or literals.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            # Note that timestamp subtraction casts arguments to integer. This 
is to mimic pandas's
-            # behaviors. pandas returns 'timedelta64[ns]' from 
'datetime64[ns]'s subtraction.
-            msg = (
-                "Note that there is a behavior difference of timestamp 
subtraction. "
-                "The timestamp subtraction returns an integer in seconds, "
-                "whereas pandas returns 'timedelta64[ns]'."
-            )
-            if isinstance(other, IndexOpsMixin) and isinstance(
-                other.spark.data_type, TimestampType
-            ):
-                warnings.warn(msg, UserWarning)
-                return self.astype("long") - other.astype("long")
-            elif isinstance(other, datetime.datetime):
-                warnings.warn(msg, UserWarning)
-                return self.astype("long") - 
F.lit(other).cast(as_spark_type("long"))
-            else:
-                raise TypeError("datetime subtraction can only be applied to 
datetime series.")
-        elif isinstance(self.spark.data_type, DateType):
-            # Note that date subtraction casts arguments to integer. This is 
to mimic pandas's
-            # behaviors. pandas returns 'timedelta64[ns]' in days from date's 
subtraction.
-            msg = (
-                "Note that there is a behavior difference of date subtraction. 
"
-                "The date subtraction returns an integer in days, "
-                "whereas pandas returns 'timedelta64[ns]'."
-            )
-            if isinstance(other, IndexOpsMixin) and 
isinstance(other.spark.data_type, DateType):
-                warnings.warn(msg, UserWarning)
-                return column_op(F.datediff)(self, other).astype("long")
-            elif isinstance(other, datetime.date) and not isinstance(other, 
datetime.datetime):
-                warnings.warn(msg, UserWarning)
-                return column_op(F.datediff)(self, F.lit(other)).astype("long")
-            else:
-                raise TypeError("date subtraction can only be applied to date 
series.")
-        return column_op(Column.__sub__)(self, other)
+        return self._dtype_op.__sub__(self, other)
 
     def __mul__(self, other) -> Union["Series", "Index"]:
-        if isinstance(other, str):
-            raise TypeError("multiplication can not be applied to a string 
literal.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            raise TypeError("multiplication can not be applied to date times.")
-
-        if (
-            isinstance(self.spark.data_type, IntegralType)
-            and isinstance(other, IndexOpsMixin)
-            and isinstance(other.spark.data_type, StringType)
-        ):
-            return column_op(SF.repeat)(other, self)
-
-        if isinstance(self.spark.data_type, StringType):
-            if (
-                isinstance(other, IndexOpsMixin) and 
isinstance(other.spark.data_type, IntegralType)
-            ) or isinstance(other, int):
-                return column_op(SF.repeat)(self, other)
-            else:
-                raise TypeError(
-                    "a string series can only be multiplied to an int series 
or literal"
-                )
-
-        return column_op(Column.__mul__)(self, other)
+        return self._dtype_op.__mul__(self, other)
 
     def __truediv__(self, other) -> Union["Series", "Index"]:
         """
@@ -434,122 +353,22 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
         |          -10          |   null  | -np.inf |
         +-----------------------|---------|---------+
         """
-
-        if (
-            isinstance(self.spark.data_type, StringType)
-            or (isinstance(other, IndexOpsMixin) and 
isinstance(other.spark.data_type, StringType))
-            or isinstance(other, str)
-        ):
-            raise TypeError("division can not be applied on string series or 
literals.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            raise TypeError("division can not be applied to date times.")
-
-        def truediv(left, right):
-            return F.when(F.lit(right != 0) | F.lit(right).isNull(), 
left.__div__(right)).otherwise(
-                F.when(F.lit(left == np.inf) | F.lit(left == -np.inf), 
left).otherwise(
-                    F.lit(np.inf).__div__(left)
-                )
-            )
-
-        return numpy_column_op(truediv)(self, other)
+        return self._dtype_op.__truediv__(self, other)
 
     def __mod__(self, other) -> Union["Series", "Index"]:
-        if (
-            isinstance(self.spark.data_type, StringType)
-            or (isinstance(other, IndexOpsMixin) and 
isinstance(other.spark.data_type, StringType))
-            or isinstance(other, str)
-        ):
-            raise TypeError("modulo can not be applied on string series or 
literals.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            raise TypeError("modulo can not be applied to date times.")
-
-        def mod(left, right):
-            return ((left % right) + right) % right
-
-        return column_op(mod)(self, other)
+        return self._dtype_op.__mod__(self, other)
 
     def __radd__(self, other) -> Union["Series", "Index"]:
-        # Handle 'literal' + df['col']
-        if not isinstance(self.spark.data_type, StringType) and 
isinstance(other, str):
-            raise TypeError("string addition can only be applied to string 
series or literals.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            raise TypeError("addition can not be applied to date times.")
-
-        if isinstance(self.spark.data_type, StringType):
-            if isinstance(other, str):
-                return self._with_new_scol(
-                    F.concat(F.lit(other), self.spark.column)
-                )  # TODO: dtype?
-            else:
-                raise TypeError("string addition can only be applied to string 
series or literals.")
-        else:
-            return column_op(Column.__radd__)(self, other)
+        return self._dtype_op.__radd__(self, other)
 
     def __rsub__(self, other) -> Union["Series", "Index"]:
-        if isinstance(self.spark.data_type, StringType) or isinstance(other, 
str):
-            raise TypeError("substraction can not be applied to string series 
or literals.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            # Note that timestamp subtraction casts arguments to integer. This 
is to mimic pandas's
-            # behaviors. pandas returns 'timedelta64[ns]' from 
'datetime64[ns]'s subtraction.
-            msg = (
-                "Note that there is a behavior difference of timestamp 
subtraction. "
-                "The timestamp subtraction returns an integer in seconds, "
-                "whereas pandas returns 'timedelta64[ns]'."
-            )
-            if isinstance(other, datetime.datetime):
-                warnings.warn(msg, UserWarning)
-                return -(self.astype("long") - 
F.lit(other).cast(as_spark_type("long")))
-            else:
-                raise TypeError("datetime subtraction can only be applied to 
datetime series.")
-        elif isinstance(self.spark.data_type, DateType):
-            # Note that date subtraction casts arguments to integer. This is 
to mimic pandas's
-            # behaviors. pandas returns 'timedelta64[ns]' in days from date's 
subtraction.
-            msg = (
-                "Note that there is a behavior difference of date subtraction. 
"
-                "The date subtraction returns an integer in days, "
-                "whereas pandas returns 'timedelta64[ns]'."
-            )
-            if isinstance(other, datetime.date) and not isinstance(other, 
datetime.datetime):
-                warnings.warn(msg, UserWarning)
-                return -column_op(F.datediff)(self, 
F.lit(other)).astype("long")
-            else:
-                raise TypeError("date subtraction can only be applied to date 
series.")
-        return column_op(Column.__rsub__)(self, other)
+        return self._dtype_op.__rsub__(self, other)
 
     def __rmul__(self, other) -> Union["Series", "Index"]:
-        if isinstance(other, str):
-            raise TypeError("multiplication can not be applied to a string 
literal.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            raise TypeError("multiplication can not be applied to date times.")
-
-        if isinstance(self.spark.data_type, StringType):
-            if isinstance(other, int):
-                return column_op(SF.repeat)(self, other)
-            else:
-                raise TypeError(
-                    "a string series can only be multiplied to an int series 
or literal"
-                )
-
-        return column_op(Column.__rmul__)(self, other)
+        return self._dtype_op.__rmul__(self, other)
 
     def __rtruediv__(self, other) -> Union["Series", "Index"]:
-        if isinstance(self.spark.data_type, StringType) or isinstance(other, 
str):
-            raise TypeError("division can not be applied on string series or 
literals.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            raise TypeError("division can not be applied to date times.")
-
-        def rtruediv(left, right):
-            return F.when(left == 0, F.lit(np.inf).__div__(right)).otherwise(
-                F.lit(right).__truediv__(left)
-            )
-
-        return numpy_column_op(rtruediv)(self, other)
+        return self._dtype_op.__rtruediv__(self, other)
 
     def __floordiv__(self, other) -> Union["Series", "Index"]:
         """
@@ -568,66 +387,19 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
         |          -10          |   null  | -np.inf |
         +-----------------------|---------|---------+
         """
-        if (
-            isinstance(self.spark.data_type, StringType)
-            or (isinstance(other, IndexOpsMixin) and 
isinstance(other.spark.data_type, StringType))
-            or isinstance(other, str)
-        ):
-            raise TypeError("division can not be applied on string series or 
literals.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            raise TypeError("division can not be applied to date times.")
-
-        def floordiv(left, right):
-            return F.when(F.lit(right is np.nan), np.nan).otherwise(
-                F.when(
-                    F.lit(right != 0) | F.lit(right).isNull(), 
F.floor(left.__div__(right))
-                ).otherwise(
-                    F.when(F.lit(left == np.inf) | F.lit(left == -np.inf), 
left).otherwise(
-                        F.lit(np.inf).__div__(left)
-                    )
-                )
-            )
-
-        return numpy_column_op(floordiv)(self, other)
+        return self._dtype_op.__floordiv__(self, other)
 
     def __rfloordiv__(self, other) -> Union["Series", "Index"]:
-        if isinstance(self.spark.data_type, StringType) or isinstance(other, 
str):
-            raise TypeError("division can not be applied on string series or 
literals.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            raise TypeError("division can not be applied to date times.")
-
-        def rfloordiv(left, right):
-            return F.when(F.lit(left == 0), 
F.lit(np.inf).__div__(right)).otherwise(
-                F.when(F.lit(left) == np.nan, 
np.nan).otherwise(F.floor(F.lit(right).__div__(left)))
-            )
-
-        return numpy_column_op(rfloordiv)(self, other)
+        return self._dtype_op.__rfloordiv__(self, other)
 
     def __rmod__(self, other) -> Union["Series", "Index"]:
-        if isinstance(self.spark.data_type, StringType) or isinstance(other, 
str):
-            raise TypeError("modulo can not be applied on string series or 
literals.")
-
-        if isinstance(self.spark.data_type, TimestampType):
-            raise TypeError("modulo can not be applied to date times.")
-
-        def rmod(left, right):
-            return ((right % left) + left) % left
-
-        return column_op(rmod)(self, other)
+        return self._dtype_op.__rmod__(self, other)
 
     def __pow__(self, other) -> Union["Series", "Index"]:
-        def pow_func(left, right):
-            return F.when(left == 1, left).otherwise(Column.__pow__(left, 
right))
-
-        return column_op(pow_func)(self, other)
+        return self._dtype_op.__pow__(self, other)
 
     def __rpow__(self, other) -> Union["Series", "Index"]:
-        def rpow_func(left, right):
-            return F.when(F.lit(right == 1), 
right).otherwise(Column.__rpow__(left, right))
-
-        return column_op(rpow_func)(self, other)
+        return self._dtype_op.__rpow__(self, other)
 
     __abs__ = column_op(F.abs)
 
diff --git a/python/pyspark/pandas/data_type_ops/__init__.py 
b/python/pyspark/pandas/data_type_ops/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/python/pyspark/pandas/data_type_ops/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/python/pyspark/pandas/data_type_ops/base.py 
b/python/pyspark/pandas/data_type_ops/base.py
new file mode 100644
index 0000000..4f92a2e
--- /dev/null
+++ b/python/pyspark/pandas/data_type_ops/base.py
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABCMeta, abstractmethod
+from typing import TYPE_CHECKING, Union
+
+from pandas.api.types import CategoricalDtype
+
+from pyspark.sql.types import (
+    BooleanType,
+    DataType,
+    DateType,
+    FractionalType,
+    IntegralType,
+    StringType,
+    TimestampType,
+)
+
+from pyspark.pandas.typedef import Dtype
+
+if TYPE_CHECKING:
+    from pyspark.pandas.indexes import Index  # noqa: F401 (SPARK-34943)
+    from pyspark.pandas.series import Series  # noqa: F401 (SPARK-34943)
+
+
+class DataTypeOps(object, metaclass=ABCMeta):
+    """The base class for binary operations of pandas-on-Spark objects (of 
different data types)."""
+
+    def __new__(cls, dtype: Dtype, spark_type: DataType):
+        from pyspark.pandas.data_type_ops.boolean_ops import BooleanOps
+        from pyspark.pandas.data_type_ops.categorical_ops import CategoricalOps
+        from pyspark.pandas.data_type_ops.date_ops import DateOps
+        from pyspark.pandas.data_type_ops.datetime_ops import DatetimeOps
+        from pyspark.pandas.data_type_ops.num_ops import (
+            IntegralOps,
+            FractionalOps,
+        )
+        from pyspark.pandas.data_type_ops.string_ops import StringOps
+
+        if isinstance(dtype, CategoricalDtype):
+            return object.__new__(CategoricalOps)
+        elif isinstance(spark_type, FractionalType):
+            return object.__new__(FractionalOps)
+        elif isinstance(spark_type, IntegralType):
+            return object.__new__(IntegralOps)
+        elif isinstance(spark_type, StringType):
+            return object.__new__(StringOps)
+        elif isinstance(spark_type, BooleanType):
+            return object.__new__(BooleanOps)
+        elif isinstance(spark_type, TimestampType):
+            return object.__new__(DatetimeOps)
+        elif isinstance(spark_type, DateType):
+            return object.__new__(DateOps)
+        else:
+            raise TypeError("Type %s was not understood." % dtype)
+
+    def __init__(self, dtype: Dtype, spark_type: DataType):
+        self.dtype = dtype
+        self.spark_type = spark_type
+
+    @property
+    @abstractmethod
+    def pretty_name(self) -> str:
+        raise NotImplementedError()
+
+    def __add__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Addition can not be applied to %s." % 
self.pretty_name)
+
+    def __sub__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Subtraction can not be applied to %s." % 
self.pretty_name)
+
+    def __mul__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Multiplication can not be applied to %s." % 
self.pretty_name)
+
+    def __truediv__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("True division can not be applied to %s." % 
self.pretty_name)
+
+    def __floordiv__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Floor division can not be applied to %s." % 
self.pretty_name)
+
+    def __mod__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Modulo can not be applied to %s." % self.pretty_name)
+
+    def __pow__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Exponentiation can not be applied to %s." % 
self.pretty_name)
+
+    def __radd__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Addition can not be applied to %s." % 
self.pretty_name)
+
+    def __rsub__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Subtraction can not be applied to %s." % 
self.pretty_name)
+
+    def __rmul__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Multiplication can not be applied to %s." % 
self.pretty_name)
+
+    def __rtruediv__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("True division can not be applied to %s." % 
self.pretty_name)
+
+    def __rfloordiv__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Floor division can not be applied to %s." % 
self.pretty_name)
+
+    def __rmod__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Modulo can not be applied to %s." % self.pretty_name)
+
+    def __rpow__(self, left, right) -> Union["Series", "Index"]:
+        raise TypeError("Exponentiation can not be applied to %s." % 
self.pretty_name)
diff --git a/python/pyspark/pandas/data_type_ops/boolean_ops.py 
b/python/pyspark/pandas/data_type_ops/boolean_ops.py
new file mode 100644
index 0000000..4a72123
--- /dev/null
+++ b/python/pyspark/pandas/data_type_ops/boolean_ops.py
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.pandas.data_type_ops.base import DataTypeOps
+
+
+class BooleanOps(DataTypeOps):
+    """
+    The class for binary operations of pandas-on-Spark objects with spark 
type: BooleanType.
+    """
+
+    @property
+    def pretty_name(self) -> str:
+        return 'booleans'
diff --git a/python/pyspark/pandas/data_type_ops/categorical_ops.py 
b/python/pyspark/pandas/data_type_ops/categorical_ops.py
new file mode 100644
index 0000000..9c57868
--- /dev/null
+++ b/python/pyspark/pandas/data_type_ops/categorical_ops.py
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.pandas.data_type_ops.base import DataTypeOps
+
+
+class CategoricalOps(DataTypeOps):
+    """
+    The class for binary operations of pandas-on-Spark objects with 
categorical types.
+    """
+
+    @property
+    def pretty_name(self) -> str:
+        return 'categoricals'
diff --git a/python/pyspark/pandas/data_type_ops/date_ops.py 
b/python/pyspark/pandas/data_type_ops/date_ops.py
new file mode 100644
index 0000000..501280c
--- /dev/null
+++ b/python/pyspark/pandas/data_type_ops/date_ops.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import warnings
+from typing import TYPE_CHECKING, Union
+
+from pyspark.sql import functions as F
+from pyspark.sql.types import DateType
+
+from pyspark.pandas.base import column_op, IndexOpsMixin
+from pyspark.pandas.data_type_ops.base import DataTypeOps
+
+if TYPE_CHECKING:
+    from pyspark.pandas.indexes import Index  # noqa: F401 (SPARK-34943)
+    from pyspark.pandas.series import Series  # noqa: F401 (SPARK-34943)
+
+
+class DateOps(DataTypeOps):
+    """
+    The class for binary operations of pandas-on-Spark objects with spark 
type: DateType.
+    """
+
+    @property
+    def pretty_name(self) -> str:
+        return 'dates'
+
+    def __sub__(self, left, right) -> Union["Series", "Index"]:
+        # Note that date subtraction casts arguments to integer. This is to 
mimic pandas's
+        # behaviors. pandas returns 'timedelta64[ns]' in days from date's 
subtraction.
+        msg = (
+            "Note that there is a behavior difference of date subtraction. "
+            "The date subtraction returns an integer in days, "
+            "whereas pandas returns 'timedelta64[ns]'."
+        )
+        if isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, DateType):
+            warnings.warn(msg, UserWarning)
+            return column_op(F.datediff)(left, right).astype("long")
+        elif isinstance(right, datetime.date) and not isinstance(right, 
datetime.datetime):
+            warnings.warn(msg, UserWarning)
+            return column_op(F.datediff)(left, F.lit(right)).astype("long")
+        else:
+            raise TypeError("date subtraction can only be applied to date 
series.")
+
+    def __rsub__(self, left, right) -> Union["Series", "Index"]:
+        # Note that date subtraction casts arguments to integer. This is to 
mimic pandas's
+        # behaviors. pandas returns 'timedelta64[ns]' in days from date's 
subtraction.
+        msg = (
+            "Note that there is a behavior difference of date subtraction. "
+            "The date subtraction returns an integer in days, "
+            "whereas pandas returns 'timedelta64[ns]'."
+        )
+        if isinstance(right, datetime.date) and not isinstance(right, 
datetime.datetime):
+            warnings.warn(msg, UserWarning)
+            return -column_op(F.datediff)(left, F.lit(right)).astype("long")
+        else:
+            raise TypeError("date subtraction can only be applied to date 
series.")
diff --git a/python/pyspark/pandas/data_type_ops/datetime_ops.py 
b/python/pyspark/pandas/data_type_ops/datetime_ops.py
new file mode 100644
index 0000000..0a57d3f
--- /dev/null
+++ b/python/pyspark/pandas/data_type_ops/datetime_ops.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import warnings
+from typing import TYPE_CHECKING, Union
+
+from pyspark.sql import functions as F
+from pyspark.sql.types import TimestampType
+
+from pyspark.pandas.base import IndexOpsMixin
+from pyspark.pandas.data_type_ops.base import DataTypeOps
+from pyspark.pandas.typedef import as_spark_type
+
+if TYPE_CHECKING:
+    from pyspark.pandas.indexes import Index  # noqa: F401 (SPARK-34943)
+    from pyspark.pandas.series import Series  # noqa: F401 (SPARK-34943)
+
+
+class DatetimeOps(DataTypeOps):
+    """
+    The class for binary operations of pandas-on-Spark objects with spark 
type: TimestampType.
+    """
+
+    @property
+    def pretty_name(self) -> str:
+        return 'datetimes'
+
+    def __sub__(self, left, right) -> Union["Series", "Index"]:
+        # Note that timestamp subtraction casts arguments to integer. This is 
to mimic pandas's
+        # behaviors. pandas returns 'timedelta64[ns]' from 'datetime64[ns]'s 
subtraction.
+        msg = (
+            "Note that there is a behavior difference of timestamp 
subtraction. "
+            "The timestamp subtraction returns an integer in seconds, "
+            "whereas pandas returns 'timedelta64[ns]'."
+        )
+        if isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, TimestampType):
+            warnings.warn(msg, UserWarning)
+            return left.astype("long") - right.astype("long")
+        elif isinstance(right, datetime.datetime):
+            warnings.warn(msg, UserWarning)
+            return left.astype("long") - 
F.lit(right).cast(as_spark_type("long"))
+        else:
+            raise TypeError("datetime subtraction can only be applied to 
datetime series.")
+
+    def __rsub__(self, left, right) -> Union["Series", "Index"]:
+        # Note that timestamp subtraction casts arguments to integer. This is 
to mimic pandas's
+        # behaviors. pandas returns 'timedelta64[ns]' from 'datetime64[ns]'s 
subtraction.
+        msg = (
+            "Note that there is a behavior difference of timestamp 
subtraction. "
+            "The timestamp subtraction returns an integer in seconds, "
+            "whereas pandas returns 'timedelta64[ns]'."
+        )
+        if isinstance(right, datetime.datetime):
+            warnings.warn(msg, UserWarning)
+            return -(left.astype("long") - 
F.lit(right).cast(as_spark_type("long")))
+        else:
+            raise TypeError("datetime subtraction can only be applied to 
datetime series.")
diff --git a/python/pyspark/pandas/data_type_ops/num_ops.py 
b/python/pyspark/pandas/data_type_ops/num_ops.py
new file mode 100644
index 0000000..e6b6d96
--- /dev/null
+++ b/python/pyspark/pandas/data_type_ops/num_ops.py
@@ -0,0 +1,378 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import numbers
+from typing import TYPE_CHECKING, Union
+
+import numpy as np
+from pandas.api.types import CategoricalDtype
+
+from pyspark.sql import Column, functions as F
+from pyspark.sql.types import (
+    NumericType,
+    StringType,
+    TimestampType,
+)
+
+from pyspark.pandas.base import column_op, IndexOpsMixin, numpy_column_op
+from pyspark.pandas.data_type_ops.base import DataTypeOps
+from pyspark.pandas.spark import functions as SF
+
+if TYPE_CHECKING:
+    from pyspark.pandas.indexes import Index  # noqa: F401 (SPARK-34943)
+    from pyspark.pandas.series import Series  # noqa: F401 (SPARK-34943)
+
+
+class NumericOps(DataTypeOps):
+    """
+    The class for binary operations of numeric pandas-on-Spark objects.
+    """
+
+    @property
+    def pretty_name(self) -> str:
+        return 'numerics'
+
+    def __add__(self, left, right) -> Union["Series", "Index"]:
+        if (
+            isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, StringType)
+        ) or isinstance(right, str):
+            raise TypeError("string addition can only be applied to string 
series or literals.")
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and (
+                isinstance(right.dtype, CategoricalDtype)
+                or (not isinstance(right.spark.data_type, NumericType))
+            )
+        ) and not isinstance(right, numbers.Number):
+            raise TypeError("addition can not be applied to given types.")
+
+        return column_op(Column.__add__)(left, right)
+
+    def __sub__(self, left, right) -> Union["Series", "Index"]:
+        if (
+            isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, StringType)
+        ) or isinstance(right, str):
+            raise TypeError("subtraction can not be applied to string series 
or literals.")
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and (
+                isinstance(right.dtype, CategoricalDtype)
+                or (not isinstance(right.spark.data_type, NumericType))
+            )
+        ) and not isinstance(right, numbers.Number):
+            raise TypeError("subtraction can not be applied to given types.")
+
+        return column_op(Column.__sub__)(left, right)
+
+    def __mod__(self, left, right) -> Union["Series", "Index"]:
+        if (
+            isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, StringType)
+        ) or isinstance(right, str):
+            raise TypeError("modulo can not be applied on string series or 
literals.")
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and (
+                isinstance(right.dtype, CategoricalDtype)
+                or (not isinstance(right.spark.data_type, NumericType))
+            )
+        ) and not isinstance(right, numbers.Number):
+            raise TypeError("modulo can not be applied to given types.")
+
+        def mod(left, right):
+            return ((left % right) + right) % right
+
+        return column_op(mod)(left, right)
+
+    def __pow__(self, left, right) -> Union["Series", "Index"]:
+        if (
+            isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, StringType)
+        ) or isinstance(right, str):
+            raise TypeError("exponentiation can not be applied on string 
series or literals.")
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and (
+                isinstance(right.dtype, CategoricalDtype)
+                or (not isinstance(right.spark.data_type, NumericType))
+            )
+        ) and not isinstance(right, numbers.Number):
+            raise TypeError("exponentiation can not be applied to given 
types.")
+
+        def pow_func(left, right):
+            return F.when(left == 1, left).otherwise(Column.__pow__(left, 
right))
+
+        return column_op(pow_func)(left, right)
+
+    def __radd__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("string addition can only be applied to string 
series or literals.")
+        if not isinstance(right, numbers.Number):
+            raise TypeError("addition can not be applied to given types.")
+
+        return column_op(Column.__radd__)(left, right)
+
+    def __rsub__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("subtraction can not be applied to string series 
or literals.")
+        if not isinstance(right, numbers.Number):
+            raise TypeError("subtraction can not be applied to given types.")
+        return column_op(Column.__rsub__)(left, right)
+
+    def __rmul__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("multiplication can not be applied to a string 
literal.")
+        if not isinstance(right, numbers.Number):
+            raise TypeError("multiplication can not be applied to given 
types.")
+        return column_op(Column.__rmul__)(left, right)
+
+    def __rpow__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("exponentiation can not be applied on string 
series or literals.")
+        if not isinstance(right, numbers.Number):
+            raise TypeError("exponentiation can not be applied to given 
types.")
+
+        def rpow_func(left, right):
+            return F.when(F.lit(right == 1), 
right).otherwise(Column.__rpow__(left, right))
+
+        return column_op(rpow_func)(left, right)
+
+    def __rmod__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("modulo can not be applied on string series or 
literals.")
+        if not isinstance(right, numbers.Number):
+            raise TypeError("modulo can not be applied to given types.")
+
+        def rmod(left, right):
+            return ((right % left) + left) % left
+
+        return column_op(rmod)(left, right)
+
+
+class IntegralOps(NumericOps):
+    """
+    The class for binary operations of pandas-on-Spark objects with spark 
types:
+    LongType, IntegerType, ByteType and ShortType.
+    """
+
+    @property
+    def pretty_name(self) -> str:
+        return 'integrals'
+
+    def __mul__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("multiplication can not be applied to a string 
literal.")
+
+        if isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, TimestampType):
+            raise TypeError("multiplication can not be applied to date times.")
+
+        if isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, StringType):
+            return column_op(SF.repeat)(right, left)
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and (
+                isinstance(right.dtype, CategoricalDtype)
+                or not isinstance(right.spark.data_type, NumericType)
+            )
+        ) and not isinstance(right, numbers.Number):
+            raise TypeError("multiplication can not be applied to given 
types.")
+
+        return column_op(Column.__mul__)(left, right)
+
+    def __truediv__(self, left, right) -> Union["Series", "Index"]:
+        if (
+            isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, StringType)
+        ) or isinstance(right, str):
+            raise TypeError("division can not be applied on string series or 
literals.")
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and (
+                isinstance(right.dtype, CategoricalDtype)
+                or (not isinstance(right.spark.data_type, NumericType))
+            )
+        ) and not isinstance(right, numbers.Number):
+            raise TypeError("division can not be applied to given types.")
+
+        def truediv(left, right):
+            return F.when(F.lit(right != 0) | F.lit(right).isNull(), 
left.__div__(right)).otherwise(
+                F.lit(np.inf).__div__(left)
+            )
+
+        return numpy_column_op(truediv)(left, right)
+
+    def __floordiv__(self, left, right) -> Union["Series", "Index"]:
+        if (
+            isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, StringType)
+        ) or isinstance(right, str):
+            raise TypeError("division can not be applied on string series or 
literals.")
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and (
+                isinstance(right.dtype, CategoricalDtype)
+                or (not isinstance(right.spark.data_type, NumericType))
+            )
+        ) and not isinstance(right, numbers.Number):
+            raise TypeError("division can not be applied to given types.")
+
+        def floordiv(left, right):
+            return F.when(F.lit(right is np.nan), np.nan).otherwise(
+                F.when(
+                    F.lit(right != 0) | F.lit(right).isNull(), 
F.floor(left.__div__(right))
+                ).otherwise(
+                    F.lit(np.inf).__div__(left)
+                )
+            )
+
+        return numpy_column_op(floordiv)(left, right)
+
+    def __rtruediv__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("division can not be applied on string series or 
literals.")
+        if not isinstance(right, numbers.Number):
+            raise TypeError("division can not be applied to given types.")
+
+        def rtruediv(left, right):
+            return F.when(left == 0, F.lit(np.inf).__div__(right)).otherwise(
+                F.lit(right).__truediv__(left)
+            )
+
+        return numpy_column_op(rtruediv)(left, right)
+
+    def __rfloordiv__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("division can not be applied on string series or 
literals.")
+        if not isinstance(right, numbers.Number):
+            raise TypeError("division can not be applied to given types.")
+
+        def rfloordiv(left, right):
+            return F.when(F.lit(left == 0), 
F.lit(np.inf).__div__(right)).otherwise(
+                F.floor(F.lit(right).__div__(left))
+            )
+
+        return numpy_column_op(rfloordiv)(left, right)
+
+
+class FractionalOps(NumericOps):
+    """
+    The class for binary operations of pandas-on-Spark objects with spark 
types:
+    FloatType, DoubleType and DecimalType.
+    """
+
+    @property
+    def pretty_name(self) -> str:
+        return 'fractions'
+
+    def __mul__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("multiplication can not be applied to a string 
literal.")
+
+        if isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, TimestampType):
+            raise TypeError("multiplication can not be applied to date times.")
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and (
+                isinstance(right.dtype, CategoricalDtype)
+                or not isinstance(right.spark.data_type, NumericType)
+            )
+        ) and not isinstance(right, numbers.Number):
+            raise TypeError("multiplication can not be applied to given 
types.")
+
+        return column_op(Column.__mul__)(left, right)
+
+    def __truediv__(self, left, right) -> Union["Series", "Index"]:
+        if (
+            isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, StringType)
+        ) or isinstance(right, str):
+            raise TypeError("division can not be applied on string series or 
literals.")
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and (
+                isinstance(right.dtype, CategoricalDtype)
+                or (not isinstance(right.spark.data_type, NumericType))
+            )
+        ) and not isinstance(right, numbers.Number):
+            raise TypeError("division can not be applied to given types.")
+
+        def truediv(left, right):
+            return F.when(F.lit(right != 0) | F.lit(right).isNull(), 
left.__div__(right)).otherwise(
+                F.when(F.lit(left == np.inf) | F.lit(left == -np.inf), 
left).otherwise(
+                    F.lit(np.inf).__div__(left)
+                )
+            )
+
+        return numpy_column_op(truediv)(left, right)
+
+    def __floordiv__(self, left, right) -> Union["Series", "Index"]:
+        if (
+            isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, StringType)
+        ) or isinstance(right, str):
+            raise TypeError("division can not be applied on string series or 
literals.")
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and (
+                isinstance(right.dtype, CategoricalDtype)
+                or (not isinstance(right.spark.data_type, NumericType))
+            )
+        ) and not isinstance(right, numbers.Number):
+            raise TypeError("division can not be applied to given types.")
+
+        def floordiv(left, right):
+            return F.when(F.lit(right is np.nan), np.nan).otherwise(
+                F.when(
+                    F.lit(right != 0) | F.lit(right).isNull(), 
F.floor(left.__div__(right))
+                ).otherwise(
+                    F.when(F.lit(left == np.inf) | F.lit(left == -np.inf), 
left).otherwise(
+                        F.lit(np.inf).__div__(left)
+                    )
+                )
+            )
+
+        return numpy_column_op(floordiv)(left, right)
+
+    def __rtruediv__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("division can not be applied on string series or 
literals.")
+        if not isinstance(right, numbers.Number):
+            raise TypeError("division can not be applied to given types.")
+
+        def rtruediv(left, right):
+            return F.when(left == 0, F.lit(np.inf).__div__(right)).otherwise(
+                F.lit(right).__truediv__(left)
+            )
+
+        return numpy_column_op(rtruediv)(left, right)
+
+    def __rfloordiv__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("division can not be applied on string series or 
literals.")
+        if not isinstance(right, numbers.Number):
+            raise TypeError("division can not be applied to given types.")
+
+        def rfloordiv(left, right):
+            return F.when(F.lit(left == 0), 
F.lit(np.inf).__div__(right)).otherwise(
+                F.when(F.lit(left) == np.nan, 
np.nan).otherwise(F.floor(F.lit(right).__div__(left)))
+            )
+
+        return numpy_column_op(rfloordiv)(left, right)
diff --git a/python/pyspark/pandas/data_type_ops/string_ops.py 
b/python/pyspark/pandas/data_type_ops/string_ops.py
new file mode 100644
index 0000000..f97e504
--- /dev/null
+++ b/python/pyspark/pandas/data_type_ops/string_ops.py
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import TYPE_CHECKING, Union
+
+from pandas.api.types import CategoricalDtype
+
+from pyspark.sql import functions as F
+from pyspark.sql.types import IntegralType, StringType
+
+from pyspark.pandas.base import column_op, IndexOpsMixin
+from pyspark.pandas.data_type_ops.base import DataTypeOps
+from pyspark.pandas.spark import functions as SF
+
+if TYPE_CHECKING:
+    from pyspark.pandas.indexes import Index  # noqa: F401 (SPARK-34943)
+    from pyspark.pandas.series import Series  # noqa: F401 (SPARK-34943)
+
+
+class StringOps(DataTypeOps):
+    """
+    The class for binary operations of pandas-on-Spark objects with spark 
type: StringType.
+    """
+
+    @property
+    def pretty_name(self) -> str:
+        return 'strings'
+
+    def __add__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, IndexOpsMixin) and 
isinstance(right.spark.data_type, StringType):
+            return column_op(F.concat)(left, right)
+        elif isinstance(right, str):
+            return column_op(F.concat)(left, F.lit(right))
+        else:
+            raise TypeError("string addition can only be applied to string 
series or literals.")
+
+    def __sub__(self, left, right):
+        raise TypeError("subtraction can not be applied to string series or 
literals.")
+
+    def __mul__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            raise TypeError("multiplication can not be applied to a string 
literal.")
+
+        if (
+            isinstance(right, IndexOpsMixin)
+            and isinstance(right.spark.data_type, IntegralType)
+            and not isinstance(right.dtype, CategoricalDtype)
+        ) or isinstance(right, int):
+            return column_op(SF.repeat)(left, right)
+        else:
+            raise TypeError("a string series can only be multiplied to an int 
series or literal")
+
+    def __truediv__(self, left, right):
+        raise TypeError("division can not be applied on string series or 
literals.")
+
+    def __floordiv__(self, left, right):
+        raise TypeError("division can not be applied on string series or 
literals.")
+
+    def __mod__(self, left, right):
+        raise TypeError("modulo can not be applied on string series or 
literals.")
+
+    def __pow__(self, left, right):
+        raise TypeError("exponentiation can not be applied on string series or 
literals.")
+
+    def __radd__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, str):
+            return left._with_new_scol(F.concat(F.lit(right), 
left.spark.column))  # TODO: dtype?
+        else:
+            raise TypeError("string addition can only be applied to string 
series or literals.")
+
+    def __rsub__(self, left, right):
+        raise TypeError("subtraction can not be applied to string series or 
literals.")
+
+    def __rmul__(self, left, right) -> Union["Series", "Index"]:
+        if isinstance(right, int):
+            return column_op(SF.repeat)(left, right)
+        else:
+            raise TypeError("a string series can only be multiplied to an int 
series or literal")
+
+    def __rtruediv__(self, left, right):
+        raise TypeError("division can not be applied on string series or 
literals.")
+
+    def __rfloordiv__(self, left, right):
+        raise TypeError("division can not be applied on string series or 
literals.")
+
+    def __rpow__(self, left, right):
+        raise TypeError("exponentiation can not be applied on string series or 
literals.")
+
+    def __rmod__(self, left, right):
+        raise TypeError("modulo can not be applied on string series or 
literals.")
diff --git a/python/pyspark/pandas/tests/data_type_ops/__init__.py 
b/python/pyspark/pandas/tests/data_type_ops/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/python/pyspark/pandas/tests/data_type_ops/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
new file mode 100644
index 0000000..8689ecb
--- /dev/null
+++ b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import pandas as pd
+
+from pyspark import pandas as ps
+from pyspark.pandas.config import option_context
+from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils
+from pyspark.testing.pandasutils import PandasOnSparkTestCase
+
+
+class BooleanOpsTest(PandasOnSparkTestCase, TestCasesUtils):
+    @property
+    def pser(self):
+        return pd.Series([True, True, False])
+
+    @property
+    def kser(self):
+        return ps.from_pandas(self.pser)
+
+    def test_add(self):
+        self.assertRaises(TypeError, lambda: self.kser + 1)
+        self.assertRaises(TypeError, lambda: self.kser + 0.1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser + kser)
+
+    def test_sub(self):
+        self.assertRaises(TypeError, lambda: self.kser - 1)
+        self.assertRaises(TypeError, lambda: self.kser - 0.1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser - kser)
+
+    def test_mul(self):
+        self.assertRaises(TypeError, lambda: self.kser * 1)
+        self.assertRaises(TypeError, lambda: self.kser * 0.1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser * kser)
+
+    def test_truediv(self):
+        self.assertRaises(TypeError, lambda: self.kser / 1)
+        self.assertRaises(TypeError, lambda: self.kser / 0.1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser / kser)
+
+    def test_floordiv(self):
+        self.assertRaises(TypeError, lambda: self.kser // 1)
+        self.assertRaises(TypeError, lambda: self.kser // 0.1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser // kser)
+
+    def test_mod(self):
+        self.assertRaises(TypeError, lambda: self.kser % 1)
+        self.assertRaises(TypeError, lambda: self.kser % 0.1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser % kser)
+
+    def test_pow(self):
+        self.assertRaises(TypeError, lambda: self.kser ** 1)
+        self.assertRaises(TypeError, lambda: self.kser ** 0.1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser ** kser)
+
+    def test_radd(self):
+        self.assertRaises(TypeError, lambda: 1 + self.kser)
+        self.assertRaises(TypeError, lambda: 0.1 + self.kser)
+        self.assertRaises(TypeError, lambda: "x" + self.kser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) + 
self.kser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) + 
self.kser)
+
+    def test_rsub(self):
+        self.assertRaises(TypeError, lambda: 1 - self.kser)
+        self.assertRaises(TypeError, lambda: 0.1 - self.kser)
+        self.assertRaises(TypeError, lambda: "x" - self.kser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) - 
self.kser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) - 
self.kser)
+
+    def test_rmul(self):
+        self.assertRaises(TypeError, lambda: 1 * self.kser)
+        self.assertRaises(TypeError, lambda: 0.1 * self.kser)
+        self.assertRaises(TypeError, lambda: "x" * self.kser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) * 
self.kser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) * 
self.kser)
+
+    def test_rtruediv(self):
+        self.assertRaises(TypeError, lambda: 1 / self.kser)
+        self.assertRaises(TypeError, lambda: 0.1 / self.kser)
+        self.assertRaises(TypeError, lambda: "x" / self.kser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) / 
self.kser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) / 
self.kser)
+
+    def test_rfloordiv(self):
+        self.assertRaises(TypeError, lambda: 1 // self.kser)
+        self.assertRaises(TypeError, lambda: 0.1 // self.kser)
+        self.assertRaises(TypeError, lambda: "x" + self.kser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) // 
self.kser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) // 
self.kser)
+
+    def test_rpow(self):
+        self.assertRaises(TypeError, lambda: 1 ** self.kser)
+        self.assertRaises(TypeError, lambda: 0.1 ** self.kser)
+        self.assertRaises(TypeError, lambda: "x" ** self.kser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) ** 
self.kser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) ** 
self.kser)
+
+    def test_rmod(self):
+        self.assertRaises(TypeError, lambda: 1 % self.kser)
+        self.assertRaises(TypeError, lambda: 0.1 % self.kser)
+        self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) % 
self.kser)
+        self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) % 
self.kser)
+
+
+if __name__ == "__main__":
+    import unittest
+    from pyspark.pandas.tests.data_type_ops.test_boolean_ops import *  # noqa: 
F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py
new file mode 100644
index 0000000..ea61c97
--- /dev/null
+++ b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py
@@ -0,0 +1,128 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pandas as pd
+
+from pyspark import pandas as ps
+from pyspark.pandas.config import option_context
+from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils
+from pyspark.testing.pandasutils import PandasOnSparkTestCase
+
+
+class CategoricalOpsTest(PandasOnSparkTestCase, TestCasesUtils):
+    @property
+    def pser(self):
+        return pd.Series([1, "x", "y"], dtype="category")
+
+    @property
+    def kser(self):
+        return ps.from_pandas(self.pser)
+
+    def test_add(self):
+        self.assertRaises(TypeError, lambda: self.kser + "x")
+        self.assertRaises(TypeError, lambda: self.kser + 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser + kser)
+
+    def test_sub(self):
+        self.assertRaises(TypeError, lambda: self.kser - "x")
+        self.assertRaises(TypeError, lambda: self.kser - 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser - kser)
+
+    def test_mul(self):
+        self.assertRaises(TypeError, lambda: self.kser * "x")
+        self.assertRaises(TypeError, lambda: self.kser * 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser * kser)
+
+    def test_truediv(self):
+        self.assertRaises(TypeError, lambda: self.kser / "x")
+        self.assertRaises(TypeError, lambda: self.kser / 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser / kser)
+
+    def test_floordiv(self):
+        self.assertRaises(TypeError, lambda: self.kser // "x")
+        self.assertRaises(TypeError, lambda: self.kser // 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser // kser)
+
+    def test_mod(self):
+        self.assertRaises(TypeError, lambda: self.kser % "x")
+        self.assertRaises(TypeError, lambda: self.kser % 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser % kser)
+
+    def test_pow(self):
+        self.assertRaises(TypeError, lambda: self.kser ** "x")
+        self.assertRaises(TypeError, lambda: self.kser ** 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser ** kser)
+
+    def test_radd(self):
+        self.assertRaises(TypeError, lambda: "x" + self.kser)
+        self.assertRaises(TypeError, lambda: 1 + self.kser)
+
+    def test_rsub(self):
+        self.assertRaises(TypeError, lambda: "x" - self.kser)
+        self.assertRaises(TypeError, lambda: 1 - self.kser)
+
+    def test_rmul(self):
+        self.assertRaises(TypeError, lambda: "x" * self.kser)
+        self.assertRaises(TypeError, lambda: 2 * self.kser)
+
+    def test_rtruediv(self):
+        self.assertRaises(TypeError, lambda: "x" / self.kser)
+        self.assertRaises(TypeError, lambda: 1 / self.kser)
+
+    def test_rfloordiv(self):
+        self.assertRaises(TypeError, lambda: "x" // self.kser)
+        self.assertRaises(TypeError, lambda: 1 // self.kser)
+
+    def test_rmod(self):
+        self.assertRaises(TypeError, lambda: 1 % self.kser)
+
+    def test_rpow(self):
+        self.assertRaises(TypeError, lambda: "x" ** self.kser)
+        self.assertRaises(TypeError, lambda: 1 ** self.kser)
+
+
+if __name__ == "__main__":
+    import unittest
+    from pyspark.pandas.tests.data_type_ops.test_categorical_ops import *  # 
noqa: F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
new file mode 100644
index 0000000..8674355
--- /dev/null
+++ b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
@@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+
+import pandas as pd
+
+from pyspark.sql.types import DateType
+
+from pyspark import pandas as ps
+from pyspark.pandas.config import option_context
+from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils
+from pyspark.testing.pandasutils import PandasOnSparkTestCase
+
+
+class DateOpsTest(PandasOnSparkTestCase, TestCasesUtils):
+    @property
+    def pser(self):
+        return pd.Series(
+            [datetime.date(1994, 1, 31), datetime.date(1994, 2, 1), 
datetime.date(1994, 2, 2)]
+        )
+
+    @property
+    def kser(self):
+        return ps.from_pandas(self.pser)
+
+    @property
+    def some_date(self):
+        return datetime.date(1994, 1, 1)
+
+    def test_add(self):
+        self.assertRaises(TypeError, lambda: self.kser + "x")
+        self.assertRaises(TypeError, lambda: self.kser + 1)
+        self.assertRaises(TypeError, lambda: self.kser + self.some_date)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser + kser)
+
+    def test_sub(self):
+        self.assertRaises(TypeError, lambda: self.kser - "x")
+        self.assertRaises(TypeError, lambda: self.kser - 1)
+        self.assert_eq(
+            (self.pser - self.some_date).dt.days, self.kser - self.some_date,
+        )
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, kser in self.pser_kser_pairs:
+                if isinstance(kser.spark.data_type, DateType):
+                    self.assert_eq((self.pser - pser).dt.days, (self.kser - 
kser).sort_index())
+                else:
+                    self.assertRaises(TypeError, lambda: self.kser - kser)
+
+    def test_mul(self):
+        self.assertRaises(TypeError, lambda: self.kser * "x")
+        self.assertRaises(TypeError, lambda: self.kser * 1)
+        self.assertRaises(TypeError, lambda: self.kser * self.some_date)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser * kser)
+
+    def test_truediv(self):
+        self.assertRaises(TypeError, lambda: self.kser / "x")
+        self.assertRaises(TypeError, lambda: self.kser / 1)
+        self.assertRaises(TypeError, lambda: self.kser / self.some_date)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser / kser)
+
+    def test_floordiv(self):
+        self.assertRaises(TypeError, lambda: self.kser // "x")
+        self.assertRaises(TypeError, lambda: self.kser // 1)
+        self.assertRaises(TypeError, lambda: self.kser // self.some_date)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser // kser)
+
+    def test_mod(self):
+        self.assertRaises(TypeError, lambda: self.kser % "x")
+        self.assertRaises(TypeError, lambda: self.kser % 1)
+        self.assertRaises(TypeError, lambda: self.kser % self.some_date)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser % kser)
+
+    def test_pow(self):
+        self.assertRaises(TypeError, lambda: self.kser ** "x")
+        self.assertRaises(TypeError, lambda: self.kser ** 1)
+        self.assertRaises(TypeError, lambda: self.kser ** self.some_date)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser ** kser)
+
+    def test_radd(self):
+        self.assertRaises(TypeError, lambda: "x" + self.kser)
+        self.assertRaises(TypeError, lambda: 1 + self.kser)
+        self.assertRaises(TypeError, lambda: self.some_date + self.kser)
+
+    def test_rsub(self):
+        self.assertRaises(TypeError, lambda: "x" - self.kser)
+        self.assertRaises(TypeError, lambda: 1 - self.kser)
+        self.assert_eq(
+            (self.some_date - self.pser).dt.days, self.some_date - self.kser,
+        )
+
+    def test_rmul(self):
+        self.assertRaises(TypeError, lambda: "x" * self.kser)
+        self.assertRaises(TypeError, lambda: 1 * self.kser)
+        self.assertRaises(TypeError, lambda: self.some_date * self.kser)
+
+    def test_rtruediv(self):
+        self.assertRaises(TypeError, lambda: "x" / self.kser)
+        self.assertRaises(TypeError, lambda: 1 / self.kser)
+        self.assertRaises(TypeError, lambda: self.some_date / self.kser)
+
+    def test_rfloordiv(self):
+        self.assertRaises(TypeError, lambda: "x" // self.kser)
+        self.assertRaises(TypeError, lambda: 1 // self.kser)
+        self.assertRaises(TypeError, lambda: self.some_date // self.kser)
+
+    def test_rmod(self):
+        self.assertRaises(TypeError, lambda: 1 % self.kser)
+        self.assertRaises(TypeError, lambda: self.some_date % self.kser)
+
+    def test_rpow(self):
+        self.assertRaises(TypeError, lambda: "x" ** self.kser)
+        self.assertRaises(TypeError, lambda: 1 ** self.kser)
+        self.assertRaises(TypeError, lambda: self.some_date ** self.kser)
+
+
+if __name__ == "__main__":
+    import unittest
+    from pyspark.pandas.tests.data_type_ops.test_date_ops import *  # noqa: 
F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py
new file mode 100644
index 0000000..c8076e4
--- /dev/null
+++ b/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py
@@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+
+import numpy as np
+import pandas as pd
+
+from pyspark import pandas as ps
+from pyspark.pandas.config import option_context
+from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils
+from pyspark.testing.pandasutils import PandasOnSparkTestCase
+
+
+class DatetimeOpsTest(PandasOnSparkTestCase, TestCasesUtils):
+    @property
+    def pser(self):
+        return pd.Series(pd.date_range("1994-1-31 10:30:15", periods=3, 
freq="M"))
+
+    @property
+    def kser(self):
+        return ps.from_pandas(self.pser)
+
+    @property
+    def some_datetime(self):
+        return datetime.datetime(1994, 1, 31, 10, 30, 00)
+
+    def test_add(self):
+        self.assertRaises(TypeError, lambda: self.kser + "x")
+        self.assertRaises(TypeError, lambda: self.kser + 1)
+        self.assertRaises(TypeError, lambda: self.kser + self.some_datetime)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser + kser)
+
+    def test_sub(self):
+        self.assertRaises(TypeError, lambda: self.kser - "x")
+        self.assertRaises(TypeError, lambda: self.kser - 1)
+        self.assert_eq(
+            (self.pser - self.some_datetime).dt.total_seconds().astype("int"),
+            self.kser - self.some_datetime,
+        )
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, kser in self.pser_kser_pairs:
+                if pser.dtype == np.dtype("<M8[ns]"):
+                    self.assert_eq(
+                        (self.pser - pser).dt.total_seconds().astype("int"),
+                        (self.kser - kser).sort_index(),
+                    )
+                else:
+                    self.assertRaises(TypeError, lambda: self.kser - kser)
+
+    def test_mul(self):
+        self.assertRaises(TypeError, lambda: self.kser * "x")
+        self.assertRaises(TypeError, lambda: self.kser * 1)
+        self.assertRaises(TypeError, lambda: self.kser * self.some_datetime)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser * kser)
+
+    def test_truediv(self):
+        self.assertRaises(TypeError, lambda: self.kser / "x")
+        self.assertRaises(TypeError, lambda: self.kser / 1)
+        self.assertRaises(TypeError, lambda: self.kser / self.some_datetime)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser / kser)
+
+    def test_floordiv(self):
+        self.assertRaises(TypeError, lambda: self.kser // "x")
+        self.assertRaises(TypeError, lambda: self.kser // 1)
+        self.assertRaises(TypeError, lambda: self.kser // self.some_datetime)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser // kser)
+
+    def test_mod(self):
+        self.assertRaises(TypeError, lambda: self.kser % "x")
+        self.assertRaises(TypeError, lambda: self.kser % 1)
+        self.assertRaises(TypeError, lambda: self.kser % self.some_datetime)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser % kser)
+
+    def test_pow(self):
+        self.assertRaises(TypeError, lambda: self.kser ** "x")
+        self.assertRaises(TypeError, lambda: self.kser ** 1)
+        self.assertRaises(TypeError, lambda: self.kser ** self.some_datetime)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser ** kser)
+
+    def test_radd(self):
+        self.assertRaises(TypeError, lambda: "x" + self.kser)
+        self.assertRaises(TypeError, lambda: 1 + self.kser)
+        self.assertRaises(TypeError, lambda: self.some_datetime + self.kser)
+
+    def test_rsub(self):
+        self.assertRaises(TypeError, lambda: "x" - self.kser)
+        self.assertRaises(TypeError, lambda: 1 - self.kser)
+        self.assert_eq(
+            (self.some_datetime - self.pser).dt.total_seconds().astype("int"),
+            self.some_datetime - self.kser,
+        )
+
+    def test_rmul(self):
+        self.assertRaises(TypeError, lambda: "x" * self.kser)
+        self.assertRaises(TypeError, lambda: 1 * self.kser)
+        self.assertRaises(TypeError, lambda: self.some_datetime * self.kser)
+
+    def test_rtruediv(self):
+        self.assertRaises(TypeError, lambda: "x" / self.kser)
+        self.assertRaises(TypeError, lambda: 1 / self.kser)
+        self.assertRaises(TypeError, lambda: self.some_datetime / self.kser)
+
+    def test_rfloordiv(self):
+        self.assertRaises(TypeError, lambda: "x" // self.kser)
+        self.assertRaises(TypeError, lambda: 1 // self.kser)
+        self.assertRaises(TypeError, lambda: self.some_datetime // self.kser)
+
+    def test_rmod(self):
+        self.assertRaises(TypeError, lambda: 1 % self.kser)
+        self.assertRaises(TypeError, lambda: self.some_datetime % self.kser)
+
+    def test_rpow(self):
+        self.assertRaises(TypeError, lambda: "x" ** self.kser)
+        self.assertRaises(TypeError, lambda: 1 ** self.kser)
+        self.assertRaises(TypeError, lambda: self.some_datetime ** self.kser)
+
+
+if __name__ == "__main__":
+    import unittest
+    from pyspark.pandas.tests.data_type_ops.test_datetime_ops import *  # 
noqa: F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
new file mode 100644
index 0000000..fdc5e3b
--- /dev/null
+++ b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import numpy as np
+
+from pyspark.pandas.config import option_context
+from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils
+from pyspark.testing.pandasutils import PandasOnSparkTestCase
+
+
+class NumOpsTest(PandasOnSparkTestCase, TestCasesUtils):
+    """Unit tests for arithmetic operations of numeric data types.
+
+    A few test cases are disabled because pandas-on-Spark returns float64 
whereas pandas
+    returns float32.
+    The underlying reason is the respective Spark operations return DoubleType 
always.
+    """
+    def test_add(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            self.assert_eq(pser + pser, kser + kser)
+            self.assert_eq(pser + 1, kser + 1)
+            # self.assert_eq(pser + 0.1, kser + 0.1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, kser in self.numeric_pser_kser_pairs:
+                self.assertRaises(TypeError, lambda: kser + 
self.non_numeric_ksers["string"])
+                self.assertRaises(TypeError, lambda: kser + 
self.non_numeric_ksers["datetime"])
+                self.assertRaises(TypeError, lambda: kser + 
self.non_numeric_ksers["date"])
+                self.assertRaises(TypeError, lambda: kser + 
self.non_numeric_ksers["categorical"])
+                self.assertRaises(TypeError, lambda: kser + 
self.non_numeric_ksers["bool"])
+
+    def test_sub(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            self.assert_eq(pser - pser, kser - kser)
+            self.assert_eq(pser - 1, kser - 1)
+            # self.assert_eq(pser - 0.1, kser - 0.1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, kser in self.numeric_pser_kser_pairs:
+                self.assertRaises(TypeError, lambda: kser - 
self.non_numeric_ksers["string"])
+                self.assertRaises(TypeError, lambda: kser - 
self.non_numeric_ksers["datetime"])
+                self.assertRaises(TypeError, lambda: kser - 
self.non_numeric_ksers["date"])
+                self.assertRaises(TypeError, lambda: kser - 
self.non_numeric_ksers["categorical"])
+                self.assertRaises(TypeError, lambda: kser - 
self.non_numeric_ksers["bool"])
+
+    def test_mul(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            self.assert_eq(pser * pser, kser * kser)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, kser in self.numeric_pser_kser_pairs:
+                if kser.dtype in [int, np.int32]:
+                    self.assert_eq(
+                        (kser * self.non_numeric_ksers["string"]).sort_index(),
+                        pser * self.non_numeric_psers["string"],
+                    )
+                else:
+                    self.assertRaises(TypeError, lambda: kser * 
self.non_numeric_ksers["string"])
+                self.assertRaises(TypeError, lambda: kser * 
self.non_numeric_ksers["datetime"])
+                self.assertRaises(TypeError, lambda: kser * 
self.non_numeric_ksers["date"])
+                self.assertRaises(TypeError, lambda: kser * 
self.non_numeric_ksers["categorical"])
+                self.assertRaises(TypeError, lambda: kser * 
self.non_numeric_ksers["bool"])
+
+    def test_truediv(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            if kser.dtype in [float, int, np.int32]:
+                self.assert_eq(pser / pser, kser / kser)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, kser in self.numeric_pser_kser_pairs:
+                self.assertRaises(TypeError, lambda: kser / 
self.non_numeric_ksers["string"])
+                self.assertRaises(TypeError, lambda: kser / 
self.non_numeric_ksers["datetime"])
+                self.assertRaises(TypeError, lambda: kser / 
self.non_numeric_ksers["date"])
+                self.assertRaises(TypeError, lambda: kser / 
self.non_numeric_ksers["categorical"])
+                self.assertRaises(TypeError, lambda: kser / 
self.non_numeric_ksers["bool"])
+
+    def test_floordiv(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            if kser.dtype == float:
+                self.assert_eq(pser // pser, kser // kser)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, kser in self.numeric_pser_kser_pairs:
+                self.assertRaises(TypeError, lambda: kser // 
self.non_numeric_ksers["string"])
+                self.assertRaises(TypeError, lambda: kser // 
self.non_numeric_ksers["datetime"])
+                self.assertRaises(TypeError, lambda: kser // 
self.non_numeric_ksers["date"])
+                self.assertRaises(TypeError, lambda: kser // 
self.non_numeric_ksers["categorical"])
+                self.assertRaises(TypeError, lambda: kser // 
self.non_numeric_ksers["bool"])
+
+    def test_mod(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            self.assert_eq(pser % pser, kser % kser)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, kser in self.numeric_pser_kser_pairs:
+                self.assertRaises(TypeError, lambda: kser % 
self.non_numeric_ksers["string"])
+                self.assertRaises(TypeError, lambda: kser % 
self.non_numeric_ksers["datetime"])
+                self.assertRaises(TypeError, lambda: kser % 
self.non_numeric_ksers["date"])
+                self.assertRaises(TypeError, lambda: kser % 
self.non_numeric_ksers["categorical"])
+                self.assertRaises(TypeError, lambda: kser % 
self.non_numeric_ksers["bool"])
+
+    def test_pow(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            if kser.dtype == float:
+                self.assert_eq(pser ** pser, kser ** kser)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, kser in self.numeric_pser_kser_pairs:
+                self.assertRaises(TypeError, lambda: kser ** 
self.non_numeric_ksers["string"])
+                self.assertRaises(TypeError, lambda: kser ** 
self.non_numeric_ksers["datetime"])
+                self.assertRaises(TypeError, lambda: kser ** 
self.non_numeric_ksers["date"])
+                self.assertRaises(TypeError, lambda: kser ** 
self.non_numeric_ksers["categorical"])
+                self.assertRaises(TypeError, lambda: kser ** 
self.non_numeric_ksers["bool"])
+
+    def test_radd(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            self.assert_eq(1 + pser, 1 + kser)
+            # self.assert_eq(0.1 + pser, 0.1 + kser)
+            self.assertRaises(TypeError, lambda: "x" + kser)
+            self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) + 
kser)
+            self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) 
+ kser)
+
+    def test_rsub(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            self.assert_eq(1 - pser, 1 - kser)
+            # self.assert_eq(0.1 - pser, 0.1 - kser)
+            self.assertRaises(TypeError, lambda: "x" - kser)
+            self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) - 
kser)
+            self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) 
- kser)
+
+    def test_rmul(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            self.assert_eq(1 * pser, 1 * kser)
+            # self.assert_eq(0.1 * pser, 0.1 * kser)
+            self.assertRaises(TypeError, lambda: "x" * kser)
+            self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) * 
kser)
+            self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) 
* kser)
+
+    def test_rtruediv(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            # self.assert_eq(5 / pser, 5 / kser)
+            # self.assert_eq(0.1 / pser, 0.1 / kser)
+            self.assertRaises(TypeError, lambda: "x" + kser)
+            self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) / 
kser)
+            self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) 
/ kser)
+
+    def test_rfloordiv(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            # self.assert_eq(5 // pser, 5 // kser)
+            # self.assert_eq(0.1 // pser, 0.1 // kser)
+            self.assertRaises(TypeError, lambda: "x" // kser)
+            self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) // 
kser)
+            self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) 
// kser)
+
+    def test_rpow(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            # self.assert_eq(1 ** pser, 1 ** kser)
+            # self.assert_eq(0.1 ** pser, 0.1 ** kser)
+            self.assertRaises(TypeError, lambda: "x" ** kser)
+            self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) ** 
kser)
+            self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) 
** kser)
+
+    def test_rmod(self):
+        for pser, kser in self.numeric_pser_kser_pairs:
+            self.assert_eq(1 % pser, 1 % kser)
+            # self.assert_eq(0.1 % pser, 0.1 % kser)
+            self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) % 
kser)
+            self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) 
% kser)
+
+
+if __name__ == "__main__":
+    import unittest
+    from pyspark.pandas.tests.data_type_ops.test_string_ops import *  # noqa: 
F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py 
b/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py
new file mode 100644
index 0000000..6bb196b
--- /dev/null
+++ b/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import numpy as np
+import pandas as pd
+
+from pyspark import pandas as ps
+from pyspark.pandas.config import option_context
+from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils
+from pyspark.testing.pandasutils import PandasOnSparkTestCase
+
+
+class StringOpsTest(PandasOnSparkTestCase, TestCasesUtils):
+    @property
+    def pser(self):
+        return pd.Series(["x", "y", "z"])
+
+    @property
+    def kser(self):
+        return ps.from_pandas(self.pser)
+
+    def test_add(self):
+        self.assert_eq(self.pser + "x", self.kser + "x")
+        self.assertRaises(TypeError, lambda: self.kser + 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            self.assert_eq(
+                self.pser + self.non_numeric_psers["string"],
+                (self.kser + self.non_numeric_ksers["string"]).sort_index(),
+            )
+            self.assertRaises(TypeError, lambda: self.kser + 
self.non_numeric_ksers["datetime"])
+            self.assertRaises(TypeError, lambda: self.kser + 
self.non_numeric_ksers["date"])
+            self.assertRaises(TypeError, lambda: self.kser + 
self.non_numeric_ksers["categorical"])
+            self.assertRaises(TypeError, lambda: self.kser + 
self.non_numeric_ksers["bool"])
+            for kser in self.numeric_ksers:
+                self.assertRaises(TypeError, lambda: self.kser + kser)
+
+    def test_sub(self):
+        self.assertRaises(TypeError, lambda: self.kser - "x")
+        self.assertRaises(TypeError, lambda: self.kser - 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser - kser)
+
+    def test_mul(self):
+        self.assertRaises(TypeError, lambda: self.kser * "x")
+        self.assert_eq(self.pser * 1, self.kser * 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for pser, kser in self.pser_kser_pairs:
+                if kser.dtype in [np.int64, np.int32]:
+                    self.assert_eq(self.pser * pser, (self.kser * 
kser).sort_index())
+                else:
+                    self.assertRaises(TypeError, lambda: self.kser * kser)
+
+    def test_truediv(self):
+        self.assertRaises(TypeError, lambda: self.kser / "x")
+        self.assertRaises(TypeError, lambda: self.kser / 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser / kser)
+
+    def test_floordiv(self):
+        self.assertRaises(TypeError, lambda: self.kser // "x")
+        self.assertRaises(TypeError, lambda: self.kser // 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser // kser)
+
+    def test_mod(self):
+        self.assertRaises(TypeError, lambda: self.kser % "x")
+        self.assertRaises(TypeError, lambda: self.kser % 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser % kser)
+
+    def test_pow(self):
+        self.assertRaises(TypeError, lambda: self.kser ** "x")
+        self.assertRaises(TypeError, lambda: self.kser ** 1)
+
+        with option_context("compute.ops_on_diff_frames", True):
+            for kser in self.ksers:
+                self.assertRaises(TypeError, lambda: self.kser ** kser)
+
+    def test_radd(self):
+        self.assert_eq("x" + self.pser, "x" + self.kser)
+        self.assertRaises(TypeError, lambda: 1 + self.kser)
+
+    def test_rsub(self):
+        self.assertRaises(TypeError, lambda: "x" - self.kser)
+        self.assertRaises(TypeError, lambda: 1 - self.kser)
+
+    def test_rmul(self):
+        self.assertRaises(TypeError, lambda: "x" * self.kser)
+        self.assert_eq(1 * self.pser, 1 * self.kser)
+
+    def test_rtruediv(self):
+        self.assertRaises(TypeError, lambda: "x" / self.kser)
+        self.assertRaises(TypeError, lambda: 1 / self.kser)
+
+    def test_rfloordiv(self):
+        self.assertRaises(TypeError, lambda: "x" // self.kser)
+        self.assertRaises(TypeError, lambda: 1 // self.kser)
+
+    def test_rmod(self):
+        self.assertRaises(TypeError, lambda: 1 % self.kser)
+
+    def test_rpow(self):
+        self.assertRaises(TypeError, lambda: "x" ** self.kser)
+        self.assertRaises(TypeError, lambda: 1 ** self.kser)
+
+
+if __name__ == "__main__":
+    import unittest
+    from pyspark.pandas.tests.data_type_ops.test_num_ops import *  # noqa: F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py 
b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
new file mode 100644
index 0000000..2f0bbc8
--- /dev/null
+++ b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import decimal
+
+import numpy as np
+import pandas as pd
+
+import pyspark.pandas as ps
+
+
+class TestCasesUtils(object):
+    """A utility holding common test cases for arithmetic operations of 
different data types."""
+    @property
+    def numeric_psers(self):
+        dtypes = [np.float32, float, int, np.int32]
+        sers = [pd.Series([1, 2, 3], dtype=dtype) for dtype in dtypes]
+        sers.append(pd.Series([decimal.Decimal(1), decimal.Decimal(2), 
decimal.Decimal(3)]))
+        return sers
+
+    @property
+    def numeric_ksers(self):
+        return [ps.from_pandas(pser) for pser in self.numeric_psers]
+
+    @property
+    def numeric_pser_kser_pairs(self):
+        return zip(self.numeric_psers, self.numeric_ksers)
+
+    @property
+    def non_numeric_psers(self):
+        psers = {
+            "string": pd.Series(["x", "y", "z"]),
+            "datetime": pd.to_datetime(pd.Series([1, 2, 3])),
+            "bool": pd.Series([True, True, False]),
+            "date": pd.Series(
+                [datetime.date(1994, 1, 1), datetime.date(1994, 1, 2), 
datetime.date(1994, 1, 3)]
+            ),
+            "categorical": pd.Series(["a", "b", "a"], dtype="category"),
+        }
+        return psers
+
+    @property
+    def non_numeric_ksers(self):
+        ksers = {}
+
+        for k, v in self.non_numeric_psers.items():
+            ksers[k] = ps.from_pandas(v)
+        return ksers
+
+    @property
+    def ksers(self):
+        return self.numeric_ksers + list(self.non_numeric_ksers.values())
+
+    @property
+    def psers(self):
+        return self.numeric_psers + list(self.non_numeric_psers.values())
+
+    @property
+    def pser_kser_pairs(self):
+        return zip(self.psers, self.ksers)
diff --git a/python/pyspark/pandas/tests/indexes/test_datetime.py 
b/python/pyspark/pandas/tests/indexes/test_datetime.py
index af511ed..a3dc537 100644
--- a/python/pyspark/pandas/tests/indexes/test_datetime.py
+++ b/python/pyspark/pandas/tests/indexes/test_datetime.py
@@ -192,21 +192,23 @@ class DatetimeIndexTest(PandasOnSparkTestCase, TestUtils):
         for kidx, pidx in self.idx_pairs:
             py_datetime = pidx.to_pydatetime()
             for other in [1, 0.1, kidx, 
kidx.to_series().reset_index(drop=True), py_datetime]:
-                expected_err_msg = "addition can not be applied to date times."
+                expected_err_msg = "Addition can not be applied to datetimes."
                 self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
kidx + other)
                 self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other + kidx)
 
-                expected_err_msg = "multiplication can not be applied to date 
times."
+                expected_err_msg = "Multiplication can not be applied to 
datetimes."
                 self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
kidx * other)
                 self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other * kidx)
 
-                expected_err_msg = "division can not be applied to date times."
+                expected_err_msg = "True division can not be applied to 
datetimes."
                 self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
kidx / other)
                 self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other / kidx)
+
+                expected_err_msg = "Floor division can not be applied to 
datetimes."
                 self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
kidx // other)
                 self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other // kidx)
 
-                expected_err_msg = "modulo can not be applied to date times."
+                expected_err_msg = "Modulo can not be applied to datetimes."
                 self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
kidx % other)
                 self.assertRaisesRegex(TypeError, expected_err_msg, lambda: 
other % kidx)
 
diff --git a/python/pyspark/pandas/tests/test_dataframe.py 
b/python/pyspark/pandas/tests/test_dataframe.py
index 7577f01..38ebf87 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -2355,7 +2355,7 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
 
         # Negative
         kdf = ps.DataFrame({"a": ["x"], "b": [1]})
-        ks_err_msg = "substraction can not be applied to string series or 
literals"
+        ks_err_msg = "subtraction can not be applied to string series or 
literals"
 
         self.assertRaisesRegex(TypeError, ks_err_msg, lambda: kdf["a"] - 
kdf["b"])
         self.assertRaisesRegex(TypeError, ks_err_msg, lambda: kdf["b"] - 
kdf["a"])
@@ -2430,12 +2430,12 @@ class DataFrameTest(PandasOnSparkTestCase, 
SQLTestUtils):
         self.assertRaisesRegex(TypeError, ks_err_msg, lambda: kdf["b"] * 
"literal")
         self.assertRaisesRegex(TypeError, ks_err_msg, lambda: "literal" * 
kdf["b"])
         self.assertRaisesRegex(TypeError, ks_err_msg, lambda: kdf["a"] * 
"literal")
-        self.assertRaisesRegex(TypeError, ks_err_msg, lambda: "literal" * 
kdf["a"])
 
         ks_err_msg = "a string series can only be multiplied to an int series 
or literal"
         self.assertRaisesRegex(TypeError, ks_err_msg, lambda: kdf["a"] * 
kdf["a"])
         self.assertRaisesRegex(TypeError, ks_err_msg, lambda: kdf["a"] * 0.1)
         self.assertRaisesRegex(TypeError, ks_err_msg, lambda: 0.1 * kdf["a"])
+        self.assertRaisesRegex(TypeError, ks_err_msg, lambda: "literal" * 
kdf["a"])
 
     def test_sample(self):
         pdf = pd.DataFrame({"A": [0, 2, 4]})
diff --git a/python/pyspark/pandas/tests/test_series_datetime.py 
b/python/pyspark/pandas/tests/test_series_datetime.py
index deb4497..6cdebe9 100644
--- a/python/pyspark/pandas/tests/test_series_datetime.py
+++ b/python/pyspark/pandas/tests/test_series_datetime.py
@@ -84,21 +84,23 @@ class SeriesDateTimeTest(PandasOnSparkTestCase, 
SQLTestUtils):
         datetime_index = ps.Index(self.pd_start_date)
 
         for other in [1, 0.1, kser, datetime_index, py_datetime]:
-            expected_err_msg = "addition can not be applied to date times."
+            expected_err_msg = "Addition can not be applied to datetimes."
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: kser + 
other)
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
+ kser)
 
-            expected_err_msg = "multiplication can not be applied to date 
times."
+            expected_err_msg = "Multiplication can not be applied to 
datetimes."
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: kser * 
other)
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
* kser)
 
-            expected_err_msg = "division can not be applied to date times."
+            expected_err_msg = "True division can not be applied to datetimes."
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: kser / 
other)
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
/ kser)
+
+            expected_err_msg = "Floor division can not be applied to 
datetimes."
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: kser 
// other)
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
// kser)
 
-            expected_err_msg = "modulo can not be applied to date times."
+            expected_err_msg = "Modulo can not be applied to datetimes."
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: kser % 
other)
             self.assertRaisesRegex(TypeError, expected_err_msg, lambda: other 
% kser)
 
diff --git a/python/pyspark/testing/pandasutils.py 
b/python/pyspark/testing/pandasutils.py
index 4a5bfe8..a506a67 100644
--- a/python/pyspark/testing/pandasutils.py
+++ b/python/pyspark/testing/pandasutils.py
@@ -70,7 +70,7 @@ class PandasOnSparkTestCase(unittest.TestCase, SQLTestUtils):
     def tearDownClass(cls):
         # We don't stop Spark session to reuse across all tests.
         # The Spark session will be started and stopped at PyTest session 
level.
-        # Please see databricks/koalas/conftest.py.
+        # Please see pyspark/pandas/conftest.py.
         pass
 
     def assertPandasEqual(self, left, right, check_exact=True):
diff --git a/python/setup.py b/python/setup.py
index 5c4a1ae..fe4e6d6 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -221,6 +221,7 @@ try:
                   'pyspark.sbin',
                   'pyspark.jars',
                   'pyspark.pandas',
+                  'pyspark.pandas.data_type_ops',
                   'pyspark.pandas.indexes',
                   'pyspark.pandas.missing',
                   'pyspark.pandas.plot',

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to