This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 017bce7 [SPARK-36722][PYTHON] Fix Series.update with another in same frame 017bce7 is described below commit 017bce7b118cd643e652d5a7914294e281b05e6e Author: dgd-contributor <dgd_contribu...@viettel.com.vn> AuthorDate: Wed Sep 15 11:08:01 2021 -0700 [SPARK-36722][PYTHON] Fix Series.update with another in same frame ### What changes were proposed in this pull request? Fix Series.update with another in same frame also add test for update series in diff frame ### Why are the changes needed? Fix Series.update with another in same frame Pandas behavior: ``` python >>> pdf = pd.DataFrame( ... {"a": [None, 2, 3, 4, 5, 6, 7, 8, None], "b": [None, 5, None, 3, 2, 1, None, 0, 0]}, ... ) >>> pdf a b 0 NaN NaN 1 2.0 5.0 2 3.0 NaN 3 4.0 3.0 4 5.0 2.0 5 6.0 1.0 6 7.0 NaN 7 8.0 0.0 8 NaN 0.0 >>> pdf.a.update(pdf.b) >>> pdf a b 0 NaN NaN 1 5.0 5.0 2 3.0 NaN 3 3.0 3.0 4 2.0 2.0 5 1.0 1.0 6 7.0 NaN 7 0.0 0.0 8 0.0 0.0 ``` ### Does this PR introduce _any_ user-facing change? Before ```python >>> psdf = ps.DataFrame( ... {"a": [None, 2, 3, 4, 5, 6, 7, 8, None], "b": [None, 5, None, 3, 2, 1, None, 0, 0]}, ... ) >>> psdf.a.update(psdf.b) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/dgd/spark/python/pyspark/pandas/series.py", line 4551, in update combined = combine_frames(self._psdf, other._psdf, how="leftouter") File "/Users/dgd/spark/python/pyspark/pandas/utils.py", line 141, in combine_frames assert not same_anchor( AssertionError: We don't need to combine. `this` and `that` are same. >>> ``` After ```python >>> psdf = ps.DataFrame( ... {"a": [None, 2, 3, 4, 5, 6, 7, 8, None], "b": [None, 5, None, 3, 2, 1, None, 0, 0]}, ... ) >>> psdf.a.update(psdf.b) >>> psdf a b 0 NaN NaN 1 5.0 5.0 2 3.0 NaN 3 3.0 3.0 4 2.0 2.0 5 1.0 1.0 6 7.0 NaN 7 0.0 0.0 8 0.0 0.0 >>> ``` ### How was this patch tested? unit tests Closes #33968 from dgd-contributor/SPARK-36722_fix_update_same_anchor. Authored-by: dgd-contributor <dgd_contribu...@viettel.com.vn> Signed-off-by: Takuya UESHIN <ues...@databricks.com> (cherry picked from commit c15072cc7397cb59496b7da1153d663d8201865c) Signed-off-by: Takuya UESHIN <ues...@databricks.com> --- python/pyspark/pandas/series.py | 35 ++++++++++++++-------- .../pandas/tests/test_ops_on_diff_frames.py | 9 ++++++ python/pyspark/pandas/tests/test_series.py | 32 ++++++++++++++++++++ 3 files changed, 64 insertions(+), 12 deletions(-) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 568754c..0eebcc9 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -4498,22 +4498,33 @@ class Series(Frame, IndexOpsMixin, Generic[T]): if not isinstance(other, Series): raise TypeError("'other' must be a Series") - combined = combine_frames(self._psdf, other._psdf, how="leftouter") + if same_anchor(self, other): + scol = ( + F.when(other.spark.column.isNotNull(), other.spark.column) + .otherwise(self.spark.column) + .alias(self._psdf._internal.spark_column_name_for(self._column_label)) + ) + internal = self._psdf._internal.with_new_spark_column( + self._column_label, scol # TODO: dtype? + ) + self._psdf._update_internal_frame(internal) + else: + combined = combine_frames(self._psdf, other._psdf, how="leftouter") - this_scol = combined["this"]._internal.spark_column_for(self._column_label) - that_scol = combined["that"]._internal.spark_column_for(other._column_label) + this_scol = combined["this"]._internal.spark_column_for(self._column_label) + that_scol = combined["that"]._internal.spark_column_for(other._column_label) - scol = ( - F.when(that_scol.isNotNull(), that_scol) - .otherwise(this_scol) - .alias(self._psdf._internal.spark_column_name_for(self._column_label)) - ) + scol = ( + F.when(that_scol.isNotNull(), that_scol) + .otherwise(this_scol) + .alias(self._psdf._internal.spark_column_name_for(self._column_label)) + ) - internal = combined["this"]._internal.with_new_spark_column( - self._column_label, scol # TODO: dtype? - ) + internal = combined["this"]._internal.with_new_spark_column( + self._column_label, scol # TODO: dtype? + ) - self._psdf._update_internal_frame(internal.resolved_copy, requires_same_anchor=False) + self._psdf._update_internal_frame(internal.resolved_copy, requires_same_anchor=False) def where(self, cond: "Series", other: Any = np.nan) -> "Series": """ diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py index 12e87b2..11132ad 100644 --- a/python/pyspark/pandas/tests/test_ops_on_diff_frames.py +++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames.py @@ -1353,6 +1353,15 @@ class OpsOnDiffFramesEnabledTest(PandasOnSparkTestCase, SQLTestUtils): self.assert_eq(psser.sort_index(), pser.sort_index()) self.assert_eq(psdf.sort_index(), pdf.sort_index()) + pser1 = pd.Series([None, 2, 3, 4, 5, 6, 7, 8, None]) + pser2 = pd.Series([None, 5, None, 3, 2, 1, None, 0, 0]) + psser1 = ps.from_pandas(pser1) + psser2 = ps.from_pandas(pser2) + + pser1.update(pser2) + psser1.update(psser2) + self.assert_eq(psser1, pser1) + def test_where(self): pdf1 = pd.DataFrame({"A": [0, 1, 2, 3, 4], "B": [100, 200, 300, 400, 500]}) pdf2 = pd.DataFrame({"A": [0, -1, -2, -3, -4], "B": [-100, -200, -300, -400, -500]}) diff --git a/python/pyspark/pandas/tests/test_series.py b/python/pyspark/pandas/tests/test_series.py index 58c87ed..de5ba60 100644 --- a/python/pyspark/pandas/tests/test_series.py +++ b/python/pyspark/pandas/tests/test_series.py @@ -1711,6 +1711,38 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils): with self.assertRaisesRegex(TypeError, msg): psser.update(10) + def _get_data(): + pdf = pd.DataFrame( + { + "a": [None, 2, 3, 4, 5, 6, 7, 8, None], + "b": [None, 5, None, 3, 2, 1, None, 0, 0], + "c": [1, 5, 1, 3, 2, 1, 1, 0, 0], + }, + ) + psdf = ps.from_pandas(pdf) + return pdf, psdf + + pdf, psdf = _get_data() + + psdf.a.update(psdf.a) + pdf.a.update(pdf.a) + self.assert_eq(psdf, pdf) + + pdf, psdf = _get_data() + + psdf.a.update(psdf.b) + pdf.a.update(pdf.b) + self.assert_eq(psdf, pdf) + + pdf, psdf = _get_data() + pser = pdf.a + psser = psdf.a + + pser.update(pdf.b) + psser.update(psdf.b) + self.assert_eq(psser, pser) + self.assert_eq(psdf, pdf) + def test_where(self): pser1 = pd.Series([0, 1, 2, 3, 4]) psser1 = ps.from_pandas(pser1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org