This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e3256b8  [SPARK-36396][PYTHON] Implement DataFrame.cov
e3256b8 is described below

commit e3256b838b5bd5c817bc95ba9d996b878078ad35
Author: dch nguyen <dgd_contribu...@viettel.com.vn>
AuthorDate: Tue Nov 30 15:46:11 2021 +0900

    [SPARK-36396][PYTHON] Implement DataFrame.cov
    
    ### What changes were proposed in this pull request?
    
    Implement DataFrame.cov
    
    ### Why are the changes needed?
    
    Increase pandas API coverage in PySpark
    
    ### Does this PR introduce _any_ user-facing change?
    User can use
    
    ``` python
    >>> psdf = ps.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)],
    ...                   columns=['dogs', 'cats'])
    >>> psdf.cov()
           dogs      cats
    dogs  0.666667 -1.000000
    cats -1.000000  1.666667
    
    >>> pdf = pd.DataFrame(
    ...     {
    ...         "a": [1, np.nan, 3, 4],
    ...         "b": [True, False, False, True],
    ...         "c": [True, True, False, True],
    ...     }
    ... )
    >>> psdf = ps.from_pandas(pdf)
    >>> psdf.cov()
              a         b         c
    a  2.333333 -0.166667 -0.166667
    b -0.166667  0.333333  0.166667
    c -0.166667  0.166667  0.250000
    ```
    
    ### How was this patch tested?
    
    unit tests
    
    Closes #34213 from dchvn/SPARK-36396.
    
    Authored-by: dch nguyen <dgd_contribu...@viettel.com.vn>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../docs/source/reference/pyspark.pandas/frame.rst |   1 +
 python/pyspark/pandas/frame.py                     | 189 +++++++++++++++++++++
 python/pyspark/pandas/missing/frame.py             |   1 -
 python/pyspark/pandas/tests/test_dataframe.py      |  65 ++++++-
 4 files changed, 254 insertions(+), 2 deletions(-)

diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst 
b/python/docs/source/reference/pyspark.pandas/frame.rst
index bb84202..04bfe27 100644
--- a/python/docs/source/reference/pyspark.pandas/frame.rst
+++ b/python/docs/source/reference/pyspark.pandas/frame.rst
@@ -148,6 +148,7 @@ Computations / Descriptive Stats
    DataFrame.clip
    DataFrame.corr
    DataFrame.count
+   DataFrame.cov
    DataFrame.describe
    DataFrame.kurt
    DataFrame.kurtosis
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 38ac9af..edfb62e 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -77,6 +77,7 @@ from pyspark.sql.types import (
     StringType,
     StructField,
     StructType,
+    DecimalType,
 )
 from pyspark.sql.window import Window
 
@@ -8258,6 +8259,194 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         internal = self._internal.with_new_sdf(sdf, data_fields=data_fields)
         self._update_internal_frame(internal, requires_same_anchor=False)
 
+    # TODO: ddof should be implemented.
+    def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
+        """
+        Compute pairwise covariance of columns, excluding NA/null values.
+
+        Compute the pairwise covariance among the series of a DataFrame.
+        The returned data frame is the `covariance matrix
+        <https://en.wikipedia.org/wiki/Covariance_matrix>`__ of the columns
+        of the DataFrame.
+
+        Both NA and null values are automatically excluded from the
+        calculation. (See the note below about bias from missing values.)
+        A threshold can be set for the minimum number of
+        observations for each value created. Comparisons with observations
+        below this threshold will be returned as ``NaN``.
+
+        This method is generally used for the analysis of time series data to
+        understand the relationship between different measures
+        across time.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        min_periods : int, optional
+            Minimum number of observations required per pair of columns
+            to have a valid result.
+
+        Returns
+        -------
+        DataFrame
+            The covariance matrix of the series of the DataFrame.
+
+        See Also
+        --------
+        Series.cov : Compute covariance with another Series.
+
+        Examples
+        --------
+        >>> df = ps.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)],
+        ...                   columns=['dogs', 'cats'])
+        >>> df.cov()
+                  dogs      cats
+        dogs  0.666667 -1.000000
+        cats -1.000000  1.666667
+
+        >>> np.random.seed(42)
+        >>> df = ps.DataFrame(np.random.randn(1000, 5),
+        ...                   columns=['a', 'b', 'c', 'd', 'e'])
+        >>> df.cov()
+                  a         b         c         d         e
+        a  0.998438 -0.020161  0.059277 -0.008943  0.014144
+        b -0.020161  1.059352 -0.008543 -0.024738  0.009826
+        c  0.059277 -0.008543  1.010670 -0.001486 -0.000271
+        d -0.008943 -0.024738 -0.001486  0.921297 -0.013692
+        e  0.014144  0.009826 -0.000271 -0.013692  0.977795
+
+        **Minimum number of periods**
+
+        This method also supports an optional ``min_periods`` keyword
+        that specifies the required minimum number of non-NA observations for
+        each column pair in order to have a valid result:
+
+        >>> np.random.seed(42)
+        >>> df = pd.DataFrame(np.random.randn(20, 3),
+        ...                   columns=['a', 'b', 'c'])
+        >>> df.loc[df.index[:5], 'a'] = np.nan
+        >>> df.loc[df.index[5:10], 'b'] = np.nan
+        >>> sdf = ps.from_pandas(df)
+        >>> sdf.cov(min_periods=12)
+                  a         b         c
+        a  0.316741       NaN -0.150812
+        b       NaN  1.248003  0.191417
+        c -0.150812  0.191417  0.895202
+        """
+        min_periods = 1 if min_periods is None else min_periods
+
+        # Only compute covariance for Boolean and Numeric except Decimal
+        psdf = self[
+            [
+                col
+                for col in self.columns
+                if isinstance(self[col].spark.data_type, BooleanType)
+                or (
+                    isinstance(self[col].spark.data_type, NumericType)
+                    and not isinstance(self[col].spark.data_type, DecimalType)
+                )
+            ]
+        ]
+
+        num_cols = len(psdf.columns)
+        cov = np.zeros([num_cols, num_cols])
+
+        if num_cols == 0:
+            return DataFrame()
+
+        if len(psdf) < min_periods:
+            cov.fill(np.nan)
+            return DataFrame(cov, columns=psdf.columns, index=psdf.columns)
+
+        data_cols = psdf._internal.data_spark_column_names
+        cov_scols = []
+        count_not_null_scols = []
+
+        # Count number of null row between two columns
+        # Example:
+        #    a   b   c
+        # 0  1   1   1
+        # 1  NaN 2   2
+        # 2  3   NaN 3
+        # 3  4   4   4
+        #
+        #    a           b             c
+        # a  count(a, a) count(a, b) count(a, c)
+        # b              count(b, b) count(b, c)
+        # c                          count(c, c)
+        #
+        # count_not_null_scols =
+        # [F.count(a, a), F.count(a, b), F.count(a, c), F.count(b, b), 
F.count(b, c), F.count(c, c)]
+        for r in range(0, num_cols):
+            for c in range(r, num_cols):
+                count_not_null_scols.append(
+                    F.count(
+                        F.when(F.col(data_cols[r]).isNotNull() & 
F.col(data_cols[c]).isNotNull(), 1)
+                    )
+                )
+
+        count_not_null = (
+            psdf._internal.spark_frame.replace(float("nan"), None)
+            .select(*count_not_null_scols)
+            .head(1)[0]
+        )
+
+        # Calculate covariance between two columns
+        # Example:
+        # with min_periods = 3
+        #    a   b   c
+        # 0  1   1   1
+        # 1  NaN 2   2
+        # 2  3   NaN 3
+        # 3  4   4   4
+        #
+        #    a         b         c
+        # a  cov(a, a) None      cov(a, c)
+        # b            cov(b, b) cov(b, c)
+        # c                      cov(c, c)
+        #
+        # cov_scols = [F.cov(a, a), None, F.cov(a, c), F.cov(b, b), F.cov(b, 
c), F.cov(c, c)]
+        step = 0
+        for r in range(0, num_cols):
+            step += r
+            for c in range(r, num_cols):
+                cov_scols.append(
+                    F.covar_samp(
+                        F.col(data_cols[r]).cast("double"), 
F.col(data_cols[c]).cast("double")
+                    )
+                    if count_not_null[r * num_cols + c - step] >= min_periods
+                    else F.lit(None)
+                )
+
+        pair_cov = psdf._internal.spark_frame.select(*cov_scols).head(1)[0]
+
+        # Convert from row to 2D array
+        # Example:
+        # pair_cov = [cov(a, a), None, cov(a, c), cov(b, b), cov(b, c), cov(c, 
c)]
+        #
+        # cov =
+        #
+        #    a         b         c
+        # a  cov(a, a) None      cov(a, c)
+        # b            cov(b, b) cov(b, c)
+        # c                      cov(c, c)
+        step = 0
+        for r in range(0, num_cols):
+            step += r
+            for c in range(r, num_cols):
+                cov[r][c] = pair_cov[r * num_cols + c - step]
+
+        # Copy values
+        # Example:
+        # cov =
+        #    a         b         c
+        # a  cov(a, a) None      cov(a, c)
+        # b  None      cov(b, b) cov(b, c)
+        # c  cov(a, c) cov(b, c) cov(c, c)
+        cov = cov + cov.T - np.diag(np.diag(cov))
+        return DataFrame(cov, columns=psdf.columns, index=psdf.columns)
+
     def sample(
         self,
         n: Optional[int] = None,
diff --git a/python/pyspark/pandas/missing/frame.py 
b/python/pyspark/pandas/missing/frame.py
index aabc0e0..d822c14 100644
--- a/python/pyspark/pandas/missing/frame.py
+++ b/python/pyspark/pandas/missing/frame.py
@@ -39,7 +39,6 @@ class _MissingPandasLikeDataFrame(object):
     compare = _unsupported_function("compare")
     convert_dtypes = _unsupported_function("convert_dtypes")
     corrwith = _unsupported_function("corrwith")
-    cov = _unsupported_function("cov")
     ewm = _unsupported_function("ewm")
     infer_objects = _unsupported_function("infer_objects")
     interpolate = _unsupported_function("interpolate")
diff --git a/python/pyspark/pandas/tests/test_dataframe.py 
b/python/pyspark/pandas/tests/test_dataframe.py
index 701052e..ae8fcae 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+import decimal
 from datetime import datetime
 from distutils.version import LooseVersion
 import inspect
@@ -6025,6 +6025,69 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
             )
             self.assert_eq(psmidx.dtypes, expected)
 
+    def test_cov(self):
+        # SPARK-36396: Implement DataFrame.cov
+
+        # int
+        pdf = pd.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)], columns=["a", 
"b"])
+        psdf = ps.from_pandas(pdf)
+        self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+        self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), 
almost=True)
+        self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
+
+        # bool
+        pdf = pd.DataFrame(
+            {
+                "a": [1, np.nan, 3, 4],
+                "b": [True, False, False, True],
+                "c": [True, True, False, True],
+            }
+        )
+        psdf = ps.from_pandas(pdf)
+        self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+        self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), 
almost=True)
+        self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
+
+        # extension dtype
+        numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "Float32", 
"Float64", "float"]
+        boolean_dtypes = ["boolean", "bool"]
+
+        sers = [pd.Series([1, 2, 3, None], dtype=dtype) for dtype in 
numeric_dtypes]
+        sers += [pd.Series([True, False, True, None], dtype=dtype) for dtype 
in boolean_dtypes]
+        sers.append(pd.Series([decimal.Decimal(1), decimal.Decimal(2), 
decimal.Decimal(3), None]))
+
+        pdf = pd.concat(sers, axis=1)
+        pdf.columns = [dtype for dtype in numeric_dtypes + boolean_dtypes] + 
["decimal"]
+        psdf = ps.from_pandas(pdf)
+
+        self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+        self.assert_eq(pdf.cov(min_periods=3), psdf.cov(min_periods=3), 
almost=True)
+        self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4))
+
+        # string column
+        pdf = pd.DataFrame(
+            [(1, 2, "a", 1), (0, 3, "b", 1), (2, 0, "c", 9), (1, 1, "d", 1)],
+            columns=["a", "b", "c", "d"],
+        )
+        psdf = ps.from_pandas(pdf)
+        self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
+        self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), 
almost=True)
+        self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
+
+        # nan
+        np.random.seed(42)
+        pdf = pd.DataFrame(np.random.randn(20, 3), columns=["a", "b", "c"])
+        pdf.loc[pdf.index[:5], "a"] = np.nan
+        pdf.loc[pdf.index[5:10], "b"] = np.nan
+        psdf = ps.from_pandas(pdf)
+        self.assert_eq(pdf.cov(min_periods=11), psdf.cov(min_periods=11), 
almost=True)
+        self.assert_eq(pdf.cov(min_periods=10), psdf.cov(min_periods=10), 
almost=True)
+
+        # return empty DataFrame
+        pdf = pd.DataFrame([("1", "2"), ("0", "3"), ("2", "0"), ("1", "1")], 
columns=["a", "b"])
+        psdf = ps.from_pandas(pdf)
+        self.assert_eq(pdf.cov(), psdf.cov())
+
 
 if __name__ == "__main__":
     from pyspark.pandas.tests.test_dataframe import *  # noqa: F401

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to