This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e3256b8 [SPARK-36396][PYTHON] Implement DataFrame.cov e3256b8 is described below commit e3256b838b5bd5c817bc95ba9d996b878078ad35 Author: dch nguyen <dgd_contribu...@viettel.com.vn> AuthorDate: Tue Nov 30 15:46:11 2021 +0900 [SPARK-36396][PYTHON] Implement DataFrame.cov ### What changes were proposed in this pull request? Implement DataFrame.cov ### Why are the changes needed? Increase pandas API coverage in PySpark ### Does this PR introduce _any_ user-facing change? User can use ``` python >>> psdf = ps.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)], ... columns=['dogs', 'cats']) >>> psdf.cov() dogs cats dogs 0.666667 -1.000000 cats -1.000000 1.666667 >>> pdf = pd.DataFrame( ... { ... "a": [1, np.nan, 3, 4], ... "b": [True, False, False, True], ... "c": [True, True, False, True], ... } ... ) >>> psdf = ps.from_pandas(pdf) >>> psdf.cov() a b c a 2.333333 -0.166667 -0.166667 b -0.166667 0.333333 0.166667 c -0.166667 0.166667 0.250000 ``` ### How was this patch tested? unit tests Closes #34213 from dchvn/SPARK-36396. Authored-by: dch nguyen <dgd_contribu...@viettel.com.vn> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../docs/source/reference/pyspark.pandas/frame.rst | 1 + python/pyspark/pandas/frame.py | 189 +++++++++++++++++++++ python/pyspark/pandas/missing/frame.py | 1 - python/pyspark/pandas/tests/test_dataframe.py | 65 ++++++- 4 files changed, 254 insertions(+), 2 deletions(-) diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst b/python/docs/source/reference/pyspark.pandas/frame.rst index bb84202..04bfe27 100644 --- a/python/docs/source/reference/pyspark.pandas/frame.rst +++ b/python/docs/source/reference/pyspark.pandas/frame.rst @@ -148,6 +148,7 @@ Computations / Descriptive Stats DataFrame.clip DataFrame.corr DataFrame.count + DataFrame.cov DataFrame.describe DataFrame.kurt DataFrame.kurtosis diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 38ac9af..edfb62e 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -77,6 +77,7 @@ from pyspark.sql.types import ( StringType, StructField, StructType, + DecimalType, ) from pyspark.sql.window import Window @@ -8258,6 +8259,194 @@ defaultdict(<class 'list'>, {'col..., 'col...})] internal = self._internal.with_new_sdf(sdf, data_fields=data_fields) self._update_internal_frame(internal, requires_same_anchor=False) + # TODO: ddof should be implemented. + def cov(self, min_periods: Optional[int] = None) -> "DataFrame": + """ + Compute pairwise covariance of columns, excluding NA/null values. + + Compute the pairwise covariance among the series of a DataFrame. + The returned data frame is the `covariance matrix + <https://en.wikipedia.org/wiki/Covariance_matrix>`__ of the columns + of the DataFrame. + + Both NA and null values are automatically excluded from the + calculation. (See the note below about bias from missing values.) + A threshold can be set for the minimum number of + observations for each value created. Comparisons with observations + below this threshold will be returned as ``NaN``. + + This method is generally used for the analysis of time series data to + understand the relationship between different measures + across time. + + .. versionadded:: 3.3.0 + + Parameters + ---------- + min_periods : int, optional + Minimum number of observations required per pair of columns + to have a valid result. + + Returns + ------- + DataFrame + The covariance matrix of the series of the DataFrame. + + See Also + -------- + Series.cov : Compute covariance with another Series. + + Examples + -------- + >>> df = ps.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)], + ... columns=['dogs', 'cats']) + >>> df.cov() + dogs cats + dogs 0.666667 -1.000000 + cats -1.000000 1.666667 + + >>> np.random.seed(42) + >>> df = ps.DataFrame(np.random.randn(1000, 5), + ... columns=['a', 'b', 'c', 'd', 'e']) + >>> df.cov() + a b c d e + a 0.998438 -0.020161 0.059277 -0.008943 0.014144 + b -0.020161 1.059352 -0.008543 -0.024738 0.009826 + c 0.059277 -0.008543 1.010670 -0.001486 -0.000271 + d -0.008943 -0.024738 -0.001486 0.921297 -0.013692 + e 0.014144 0.009826 -0.000271 -0.013692 0.977795 + + **Minimum number of periods** + + This method also supports an optional ``min_periods`` keyword + that specifies the required minimum number of non-NA observations for + each column pair in order to have a valid result: + + >>> np.random.seed(42) + >>> df = pd.DataFrame(np.random.randn(20, 3), + ... columns=['a', 'b', 'c']) + >>> df.loc[df.index[:5], 'a'] = np.nan + >>> df.loc[df.index[5:10], 'b'] = np.nan + >>> sdf = ps.from_pandas(df) + >>> sdf.cov(min_periods=12) + a b c + a 0.316741 NaN -0.150812 + b NaN 1.248003 0.191417 + c -0.150812 0.191417 0.895202 + """ + min_periods = 1 if min_periods is None else min_periods + + # Only compute covariance for Boolean and Numeric except Decimal + psdf = self[ + [ + col + for col in self.columns + if isinstance(self[col].spark.data_type, BooleanType) + or ( + isinstance(self[col].spark.data_type, NumericType) + and not isinstance(self[col].spark.data_type, DecimalType) + ) + ] + ] + + num_cols = len(psdf.columns) + cov = np.zeros([num_cols, num_cols]) + + if num_cols == 0: + return DataFrame() + + if len(psdf) < min_periods: + cov.fill(np.nan) + return DataFrame(cov, columns=psdf.columns, index=psdf.columns) + + data_cols = psdf._internal.data_spark_column_names + cov_scols = [] + count_not_null_scols = [] + + # Count number of null row between two columns + # Example: + # a b c + # 0 1 1 1 + # 1 NaN 2 2 + # 2 3 NaN 3 + # 3 4 4 4 + # + # a b c + # a count(a, a) count(a, b) count(a, c) + # b count(b, b) count(b, c) + # c count(c, c) + # + # count_not_null_scols = + # [F.count(a, a), F.count(a, b), F.count(a, c), F.count(b, b), F.count(b, c), F.count(c, c)] + for r in range(0, num_cols): + for c in range(r, num_cols): + count_not_null_scols.append( + F.count( + F.when(F.col(data_cols[r]).isNotNull() & F.col(data_cols[c]).isNotNull(), 1) + ) + ) + + count_not_null = ( + psdf._internal.spark_frame.replace(float("nan"), None) + .select(*count_not_null_scols) + .head(1)[0] + ) + + # Calculate covariance between two columns + # Example: + # with min_periods = 3 + # a b c + # 0 1 1 1 + # 1 NaN 2 2 + # 2 3 NaN 3 + # 3 4 4 4 + # + # a b c + # a cov(a, a) None cov(a, c) + # b cov(b, b) cov(b, c) + # c cov(c, c) + # + # cov_scols = [F.cov(a, a), None, F.cov(a, c), F.cov(b, b), F.cov(b, c), F.cov(c, c)] + step = 0 + for r in range(0, num_cols): + step += r + for c in range(r, num_cols): + cov_scols.append( + F.covar_samp( + F.col(data_cols[r]).cast("double"), F.col(data_cols[c]).cast("double") + ) + if count_not_null[r * num_cols + c - step] >= min_periods + else F.lit(None) + ) + + pair_cov = psdf._internal.spark_frame.select(*cov_scols).head(1)[0] + + # Convert from row to 2D array + # Example: + # pair_cov = [cov(a, a), None, cov(a, c), cov(b, b), cov(b, c), cov(c, c)] + # + # cov = + # + # a b c + # a cov(a, a) None cov(a, c) + # b cov(b, b) cov(b, c) + # c cov(c, c) + step = 0 + for r in range(0, num_cols): + step += r + for c in range(r, num_cols): + cov[r][c] = pair_cov[r * num_cols + c - step] + + # Copy values + # Example: + # cov = + # a b c + # a cov(a, a) None cov(a, c) + # b None cov(b, b) cov(b, c) + # c cov(a, c) cov(b, c) cov(c, c) + cov = cov + cov.T - np.diag(np.diag(cov)) + return DataFrame(cov, columns=psdf.columns, index=psdf.columns) + def sample( self, n: Optional[int] = None, diff --git a/python/pyspark/pandas/missing/frame.py b/python/pyspark/pandas/missing/frame.py index aabc0e0..d822c14 100644 --- a/python/pyspark/pandas/missing/frame.py +++ b/python/pyspark/pandas/missing/frame.py @@ -39,7 +39,6 @@ class _MissingPandasLikeDataFrame(object): compare = _unsupported_function("compare") convert_dtypes = _unsupported_function("convert_dtypes") corrwith = _unsupported_function("corrwith") - cov = _unsupported_function("cov") ewm = _unsupported_function("ewm") infer_objects = _unsupported_function("infer_objects") interpolate = _unsupported_function("interpolate") diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py index 701052e..ae8fcae 100644 --- a/python/pyspark/pandas/tests/test_dataframe.py +++ b/python/pyspark/pandas/tests/test_dataframe.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import decimal from datetime import datetime from distutils.version import LooseVersion import inspect @@ -6025,6 +6025,69 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils): ) self.assert_eq(psmidx.dtypes, expected) + def test_cov(self): + # SPARK-36396: Implement DataFrame.cov + + # int + pdf = pd.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)], columns=["a", "b"]) + psdf = ps.from_pandas(pdf) + self.assert_eq(pdf.cov(), psdf.cov(), almost=True) + self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True) + self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5)) + + # bool + pdf = pd.DataFrame( + { + "a": [1, np.nan, 3, 4], + "b": [True, False, False, True], + "c": [True, True, False, True], + } + ) + psdf = ps.from_pandas(pdf) + self.assert_eq(pdf.cov(), psdf.cov(), almost=True) + self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True) + self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5)) + + # extension dtype + numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "Float32", "Float64", "float"] + boolean_dtypes = ["boolean", "bool"] + + sers = [pd.Series([1, 2, 3, None], dtype=dtype) for dtype in numeric_dtypes] + sers += [pd.Series([True, False, True, None], dtype=dtype) for dtype in boolean_dtypes] + sers.append(pd.Series([decimal.Decimal(1), decimal.Decimal(2), decimal.Decimal(3), None])) + + pdf = pd.concat(sers, axis=1) + pdf.columns = [dtype for dtype in numeric_dtypes + boolean_dtypes] + ["decimal"] + psdf = ps.from_pandas(pdf) + + self.assert_eq(pdf.cov(), psdf.cov(), almost=True) + self.assert_eq(pdf.cov(min_periods=3), psdf.cov(min_periods=3), almost=True) + self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4)) + + # string column + pdf = pd.DataFrame( + [(1, 2, "a", 1), (0, 3, "b", 1), (2, 0, "c", 9), (1, 1, "d", 1)], + columns=["a", "b", "c", "d"], + ) + psdf = ps.from_pandas(pdf) + self.assert_eq(pdf.cov(), psdf.cov(), almost=True) + self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True) + self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5)) + + # nan + np.random.seed(42) + pdf = pd.DataFrame(np.random.randn(20, 3), columns=["a", "b", "c"]) + pdf.loc[pdf.index[:5], "a"] = np.nan + pdf.loc[pdf.index[5:10], "b"] = np.nan + psdf = ps.from_pandas(pdf) + self.assert_eq(pdf.cov(min_periods=11), psdf.cov(min_periods=11), almost=True) + self.assert_eq(pdf.cov(min_periods=10), psdf.cov(min_periods=10), almost=True) + + # return empty DataFrame + pdf = pd.DataFrame([("1", "2"), ("0", "3"), ("2", "0"), ("1", "1")], columns=["a", "b"]) + psdf = ps.from_pandas(pdf) + self.assert_eq(pdf.cov(), psdf.cov()) + if __name__ == "__main__": from pyspark.pandas.tests.test_dataframe import * # noqa: F401 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org