[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-18 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r711607257



##
File path: python/pyspark/pandas/series.py
##
@@ -4463,6 +4466,161 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is required.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not np.isscalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+assert callable(func), "argument func must be a callable function."
+
+if np.isscalar(other):
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+try:
+sig_return = infer_return_type(func)
+if isinstance(sig_return, UnknownType):
+raise TypeError()
+return_type = sig_return.spark_type
+except TypeError:
+limit = ps.get_option("compute.shortcut_limit")
+pdf = combined.head(limit + 1)._to_internal_pandas()
+combined_pser = pdf.iloc[:, 0].combine(pdf.iloc[:, 1], func, 
fill_value=fill_value)
+return_type = as_spark_type(combined_pser.dtype)
+
+@pandas_udf(returnType=return_type)  # type: ignore
+def wrapped_func(x: pd.Series, y: pd.Series) -> pd.Series:
+return x.combine(y, func)
+
+

[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-17 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r711414907



##
File path: python/pyspark/pandas/tests/test_ops_on_diff_frames.py
##
@@ -1839,6 +1839,40 @@ def _test_cov(self, pser1, pser2):
 pscov = psser1.cov(psser2, min_periods=3)
 self.assert_eq(pcov, pscov, almost=True)
 
+def test_combine(self):
+def run_test_combine(pser1, pser2):
+psser1 = ps.from_pandas(pser1)
+psser2 = ps.from_pandas(pser2)
+
+self.assert_eq(pser1.combine(pser2, max), psser1.combine(psser2, 
max))
+self.assert_eq(
+pser1.combine(pser2, max, fill_value=1),
+psser1.combine(psser2, max, fill_value=1),
+)
+self.assert_eq(pser2.combine(300, max), psser2.combine(300, max))
+self.assert_eq(
+pser2.combine(300, max, fill_value=100),
+psser2.combine(300, max, fill_value=100),
+)
+
+pser1 = pd.Series({"falcon": 330.0, "eagle": 160.0}, name="s1")
+pser2 = pd.Series({"falcon": 345.0, "eagle": 200.0, "duck": 30.0}, 
name="s2")
+run_test_combine(pser1, pser2)
+
+pser1 = pd.Series({"falcon": 330.0, "eagle": 160.0}, name="same-name")
+pser2 = pd.Series({"falcon": 345.0, "eagle": 200.0, "duck": 30.0}, 
name="same-name")
+run_test_combine(pser1, pser2)
+

Review comment:
   I added that test and fixed an issue with multi-index. Please take a 
look when you have time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-17 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r711414907



##
File path: python/pyspark/pandas/tests/test_ops_on_diff_frames.py
##
@@ -1839,6 +1839,40 @@ def _test_cov(self, pser1, pser2):
 pscov = psser1.cov(psser2, min_periods=3)
 self.assert_eq(pcov, pscov, almost=True)
 
+def test_combine(self):
+def run_test_combine(pser1, pser2):
+psser1 = ps.from_pandas(pser1)
+psser2 = ps.from_pandas(pser2)
+
+self.assert_eq(pser1.combine(pser2, max), psser1.combine(psser2, 
max))
+self.assert_eq(
+pser1.combine(pser2, max, fill_value=1),
+psser1.combine(psser2, max, fill_value=1),
+)
+self.assert_eq(pser2.combine(300, max), psser2.combine(300, max))
+self.assert_eq(
+pser2.combine(300, max, fill_value=100),
+psser2.combine(300, max, fill_value=100),
+)
+
+pser1 = pd.Series({"falcon": 330.0, "eagle": 160.0}, name="s1")
+pser2 = pd.Series({"falcon": 345.0, "eagle": 200.0, "duck": 30.0}, 
name="s2")
+run_test_combine(pser1, pser2)
+
+pser1 = pd.Series({"falcon": 330.0, "eagle": 160.0}, name="same-name")
+pser2 = pd.Series({"falcon": 345.0, "eagle": 200.0, "duck": 30.0}, 
name="same-name")
+run_test_combine(pser1, pser2)
+

Review comment:
   I added that test and fixed an issue with multi-index. Please take a 
look when you have time?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-17 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r711414902



##
File path: python/pyspark/pandas/tests/test_series.py
##
@@ -2982,6 +2982,59 @@ def _test_cov(self, pdf):
 pscov = psdf["s1"].cov(psdf["s2"], min_periods=4)
 self.assert_eq(pcov, pscov, almost=True)
 
+def test_combine(self):
+pdf = pd.DataFrame(
+{"s1": [330.0, 160.0, np.nan], "s2": [345.0, 0.0, 30.0], "s3": 
[345.0, 0.0, 30.0]}
+)
+psdf = ps.from_pandas(pdf)
+
+self.assert_eq(
+pdf["s1"].combine(pdf["s2"], max),
+psdf["s1"].combine(psdf["s2"], max),
+)
+self.assert_eq(
+pdf["s1"].combine(pdf["s2"], max, fill_value=100),
+psdf["s1"].combine(psdf["s2"], max, fill_value=100),
+)
+self.assert_eq(
+pdf["s1"].combine(100, max),
+psdf["s1"].combine(100, max),
+)
+self.assert_eq(
+pdf["s1"].combine(100, max, fill_value=100),
+psdf["s1"].combine(100, max, fill_value=100),
+)
+
+pdf = pd.DataFrame({"s1": ["a a", "b", ""], "s2": [345.0, np.nan, 
30.0]})
+psdf = ps.from_pandas(pdf)
+
+def concat_strings(s1: str, s2: str) -> str:
+return s1 + ": " + str(s2)
+
+self.assert_eq(
+pdf["s1"].combine(pdf["s2"], concat_strings),
+psdf["s1"].combine(psdf["s2"], concat_strings),
+)
+self.assert_eq(
+pdf["s1"].combine(pdf["s2"], concat_strings, fill_value=10.0),
+psdf["s1"].combine(psdf["s2"], concat_strings, fill_value=10.0),
+)
+self.assert_eq(
+pdf["s1"].combine(100, concat_strings),
+psdf["s1"].combine(100, concat_strings),
+)
+
+pdf = pd.DataFrame({"s1": [1, 2, 3], "s2": [4, 5, 6]})
+psdf = ps.from_pandas(pdf)
+
+def true_div(s1: int, s2: int) -> float:
+return s1 / s2
+
+self.assert_eq(
+pdf["s1"].combine(pdf["s2"], true_div),
+psdf["s1"].combine(psdf["s2"], true_div),
+)

Review comment:
   updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-17 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r711154826



##
File path: python/pyspark/pandas/series.py
##
@@ -4463,6 +4466,158 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+.. note:: this API executes the function once to infer the type which 
is
+ potentially expensive, for instance, when the dataset is created 
after
+ aggregations or sorting.
+
+ To avoid this, specify return type in ``func``, for instance, as 
below:
+
+ >>> def foo(x, y) -> np.int32:
+ ... return x * y
+
+ pandas-on-Spark uses return type hint and does not try to infer 
the type.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+Note that type hint for return type is required.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max)
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0)
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not np.isscalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+assert callable(func), "argument func must be a callable function."
+
+if np.isscalar(other):
+tmp_other_col = 
verify_temp_column_name(self._internal.spark_frame, "__tmp_other_col__")
+combined = self.to_frame()
+combined[tmp_other_col] = other
+combined = DataFrame(combined._internal.resolved_copy)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+try:
+sig_return = infer_return_type(func)
+if isinstance(sig_return, UnknownType):
+raise TypeError()
+return_type = sig_return.spark_type
+except TypeError:
+limit = ps.get_option("compute.shortcut_limit")
+pdf = combined.head(limit + 1)._to_internal_pandas()
+combined_pser = pdf.iloc[:, 0].combine(pdf.iloc[:, 1], func, 
fill_value=fill_value)
+return_type = as_spark_type(combined_pser.dtype)
+
+@pandas_udf(returnType=return_type)  # type: ignore
+def wrapped_func(x: pd.Series, y: pd.Series) -> pd.Series:
+return x.combine(y, func)
+
+

[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-08 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r704274595



##
File path: python/pyspark/pandas/series.py
##
@@ -4475,6 +4477,146 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max, return_type="double")
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0, return_type="double")
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not np.isscalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+tmp_other_col = verify_temp_column_name(self._internal.spark_frame, 
"__tmp_other_col__")
+if np.isscalar(other):
+combined = DataFrame(
+InternalFrame(
+spark_frame=self._internal.spark_frame.select(
+*self._internal.spark_columns
+).withColumn(tmp_other_col, SF.lit(other)),
+index_spark_columns=self._internal.index_spark_columns,
+)
+)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+@pandas_udf(returnType=return_type)  # type: ignore
+def wrapped_func(x: pd.Series, y: pd.Series) -> pd.Series:
+return x.combine(y, func)
+
+internal = InternalFrame(
+spark_frame=combined._internal.spark_frame.select(
+*combined._internal.index_spark_columns,
+wrapped_func(*combined._internal.data_spark_columns),
+NATURAL_ORDER_COLUMN_NAME
+),
+index_spark_columns=combined._internal.index_spark_columns,
+column_labels=self._internal.column_labels if np.isscalar(other) 
else [None],
+column_label_names=[None],
+)

Review comment:
   many thanks, updated. Please take another look whe

[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-07 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r703648372



##
File path: python/pyspark/pandas/series.py
##
@@ -4475,6 +4477,146 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max, return_type="double")
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0, return_type="double")
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not np.isscalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+tmp_other_col = verify_temp_column_name(self._internal.spark_frame, 
"__tmp_other_col__")
+if np.isscalar(other):
+combined = DataFrame(
+InternalFrame(
+spark_frame=self._internal.spark_frame.select(
+*self._internal.spark_columns
+).withColumn(tmp_other_col, SF.lit(other)),
+index_spark_columns=self._internal.index_spark_columns,
+)
+)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+@pandas_udf(returnType=return_type)  # type: ignore
+def wrapped_func(x: pd.Series, y: pd.Series) -> pd.Series:
+return x.combine(y, func)
+
+internal = InternalFrame(
+spark_frame=combined._internal.spark_frame.select(
+*combined._internal.index_spark_columns,
+wrapped_func(*combined._internal.data_spark_columns),
+NATURAL_ORDER_COLUMN_NAME
+),
+index_spark_columns=combined._internal.index_spark_columns,
+column_labels=self._internal.column_labels if np.isscalar(other) 
else [None],
+column_label_names=[None],
+)

Review comment:
   After I try to fix the failed test, I think we sho

[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-07 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r703242599



##
File path: python/pyspark/pandas/series.py
##
@@ -4475,6 +4477,146 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.

Review comment:
   thank you for your comments, I have learned a lot. 
   
   If I have
   
   ``` python
   >>> pdf = pd.DataFrame({"s1": [1, 2, 3], "s2": [4, 5, 6]})
   >>> psdf = ps.from_pandas(pdf)
   
   >>> def true_div(s1: int, s2: int) -> float:
   >>> return s1 / s2
   
   >>> pdf["s1"].combine(pdf["s2"], true_div),
   00.25
   10.40
   20.50
   dtype: float64
   float64
   >>> psdf["s1"].combine(psdf["s2"], true_div),
   00
   10
   20
   dtype: int64
   int64
   ```
   I think the output's dtype should not be the self's dtype. I stuck with 
this. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-07 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r703404972



##
File path: python/pyspark/pandas/series.py
##
@@ -4475,6 +4477,146 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max, return_type="double")
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0, return_type="double")
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not np.isscalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+tmp_other_col = verify_temp_column_name(self._internal.spark_frame, 
"__tmp_other_col__")
+if np.isscalar(other):
+combined = DataFrame(
+InternalFrame(
+spark_frame=self._internal.spark_frame.select(
+*self._internal.spark_columns
+).withColumn(tmp_other_col, SF.lit(other)),
+index_spark_columns=self._internal.index_spark_columns,
+)
+)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]
+elif fill_value is None:
+combined = combine_frames(self.to_frame(), other.to_frame())
+else:
+combined = self._combine_frame_with_fill_value(other, 
fill_value=fill_value)
+
+@pandas_udf(returnType=return_type)  # type: ignore
+def wrapped_func(x: pd.Series, y: pd.Series) -> pd.Series:
+return x.combine(y, func)
+
+internal = InternalFrame(
+spark_frame=combined._internal.spark_frame.select(
+*combined._internal.index_spark_columns,
+wrapped_func(*combined._internal.data_spark_columns),
+NATURAL_ORDER_COLUMN_NAME
+),
+index_spark_columns=combined._internal.index_spark_columns,
+column_labels=self._internal.column_labels if np.isscalar(other) 
else [None],
+column_label_names=[None],
+)

Review comment:
   updated

##
File path: python/pyspark/pand

[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-07 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r703344106



##
File path: python/pyspark/pandas/series.py
##
@@ -4475,6 +4477,146 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max, return_type="double")
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0, return_type="double")
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not np.isscalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+tmp_other_col = verify_temp_column_name(self._internal.spark_frame, 
"__tmp_other_col__")
+if np.isscalar(other):
+combined = DataFrame(
+InternalFrame(
+spark_frame=self._internal.spark_frame.select(
+*self._internal.spark_columns
+).withColumn(tmp_other_col, SF.lit(other)),
+index_spark_columns=self._internal.index_spark_columns,
+)
+)
+elif same_anchor(self, other):
+combined = self._psdf[self._column_label, other._column_label]

Review comment:
   I think 
   ``` python
   combined = self._psdf[self._column_label, other._column_label]
   ```
   does not trigger any computation and this reduce duplicated code a bit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-07 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r703242599



##
File path: python/pyspark/pandas/series.py
##
@@ -4475,6 +4477,146 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.

Review comment:
   thank you for your comments, I have learned a lot. If I have
   
   ``` python
   >>> pdf = pd.DataFrame({"s1": [1, 2, 3], "s2": [4, 5, 6]})
   >>> psdf = ps.from_pandas(pdf)
   
   >>> def true_div(s1: int, s2: int) -> float:
   >>> return s1 / s2
   
   >>> pdf["s1"].combine(pdf["s2"], true_div),
   00.25
   10.40
   20.50
   dtype: float64
   float64
   >>> psdf["s1"].combine(psdf["s2"], true_div),
   00
   10
   20
   dtype: int64
   int64
   ```
   I think the output's dtype should not be the self's dtype. I stuck with 
this. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-07 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r703242599



##
File path: python/pyspark/pandas/series.py
##
@@ -4475,6 +4477,146 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.

Review comment:
   thank for you comments, I have learned a lot. If I have
   
   ``` python
   >>> pdf = pd.DataFrame({"s1": [1, 2, 3], "s2": [4, 5, 6]})
   >>> psdf = ps.from_pandas(pdf)
   
   >>> def true_div(s1: int, s2: int) -> float:
   >>> return s1 / s2
   
   >>> pdf["s1"].combine(pdf["s2"], true_div),
   00.25
   10.40
   20.50
   dtype: float64
   float64
   >>> psdf["s1"].combine(psdf["s2"], true_div),
   00
   10
   20
   dtype: int64
   int64
   ```
   I think the output's dtype should not be the self's dtype. I stuck with 
this. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-07 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r703242599



##
File path: python/pyspark/pandas/series.py
##
@@ -4475,6 +4477,146 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Union[Union[AtomicType, str], ArrayType] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+.. versionadded:: 3.3.0
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.

Review comment:
   thank for you comments, I have learn a lot. If I have
   
   ``` python
   >>> pdf = pd.DataFrame({"s1": [1, 2, 3], "s2": [4, 5, 6]})
   >>> psdf = ps.from_pandas(pdf)
   
   >>> def true_div(s1: int, s2: int) -> float:
   >>> return s1 / s2
   
   >>> pdf["s1"].combine(pdf["s2"], true_div),
   00.25
   10.40
   20.50
   dtype: float64
   float64
   >>> psdf["s1"].combine(psdf["s2"], true_div),
   00
   10
   20
   dtype: int64
   int64
   ```
   I think the output's dtype should not be the self's dtype. I stuck with 
this. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-02 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r701178957



##
File path: python/pyspark/pandas/series.py
##
@@ -4425,6 +4427,127 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Optional[Union[str, DataType]] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max, return_type="double")
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0, return_type="double")
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not np.isscalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+if np.isscalar(other):
+combined = DataFrame(
+InternalFrame(
+
spark_frame=self._internal.spark_frame.withColumn("__other", SF.lit(other)),
+index_spark_columns=self._internal.index_spark_columns,
+)
+)
+else:
+combined = (
+self._psdf[self._column_label, other._column_label]
+if same_anchor(self, other)
+else combine_frames(self.to_frame(), other.to_frame())
+)
+
+sdf = combined._internal.spark_frame
+if fill_value is not None:
+sdf = sdf.fillna(fill_value)
+
+@udf

Review comment:
   thank you for the suggestions, updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-01 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r700322747



##
File path: python/pyspark/pandas/series.py
##
@@ -4425,6 +4427,127 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Optional[Union[str, DataType]] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max, return_type="double")
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0, return_type="double")
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not np.isscalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+if np.isscalar(other):
+combined = DataFrame(
+InternalFrame(
+
spark_frame=self._internal.spark_frame.withColumn("__other", SF.lit(other)),
+index_spark_columns=self._internal.index_spark_columns,
+)
+)
+else:
+combined = (
+self._psdf[self._column_label, other._column_label]
+if same_anchor(self, other)
+else combine_frames(self.to_frame(), other.to_frame())
+)
+
+sdf = combined._internal.spark_frame
+if fill_value is not None:
+sdf = sdf.fillna(fill_value)
+
+@udf

Review comment:
   As I know, the func argument in pandas Series.combine takes two scalars 
as inputs and returns an element and the pandas user defined function should 
have inputs and output type are pd.Series or pd.Dataframe. So I do not know how 
to wrap the func argument in pandas Series.combine with pandas_udf.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dgd-contributor commented on a change in pull request #33858: [SPARK-36402][PYTHON] Implement Series.combine

2021-09-01 Thread GitBox


dgd-contributor commented on a change in pull request #33858:
URL: https://github.com/apache/spark/pull/33858#discussion_r700311964



##
File path: python/pyspark/pandas/series.py
##
@@ -4425,6 +4427,127 @@ def replace(
 
 return self._with_new_scol(current)  # TODO: dtype?
 
+def combine(
+self,
+other: "Series",
+func: Callable,
+fill_value: Optional[Any] = None,
+return_type: Optional[Union[str, DataType]] = "string",
+) -> "Series":
+"""
+Combine the Series with a Series or scalar according to `func`.
+Combine the Series and `other` using `func` to perform elementwise
+selection for combined Series.
+`fill_value` is assumed when value is missing at some index
+from one of the two objects being combined.
+
+Parameters
+--
+other : Series or scalar
+The value(s) to be combined with the `Series`.
+func : function
+Function that takes two scalars as inputs and returns an element.
+fill_value : scalar, optional
+The value to assume when an index is missing from
+one Series or the other. The default specifies to use the
+appropriate NaN value for the underlying dtype of the Series.
+return_type : :class:`pyspark.sql.types.DataType` or str
+the return type of the output Series. The value can be either a
+:class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+
+Returns
+---
+Series
+The result of combining the Series with the other object.
+
+See Also
+
+Series.combine_first : Combine Series values, choosing the calling
+Series' values first.
+
+Examples
+
+Consider 2 Datasets ``s1`` and ``s2`` containing
+highest clocked speeds of different birds.
+
+>>> from pyspark.pandas.config import set_option, reset_option
+>>> set_option("compute.ops_on_diff_frames", True)
+>>> s1 = ps.Series({'falcon': 330.0, 'eagle': 160.0})
+>>> s1
+falcon330.0
+eagle 160.0
+dtype: float64
+>>> s2 = ps.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+>>> s2
+falcon345.0
+eagle 200.0
+duck   30.0
+dtype: float64
+
+Now, to combine the two datasets and view the highest speeds
+of the birds across the two datasets
+
+>>> s1.combine(s2, max, return_type="double")
+duckNaN
+eagle 200.0
+falcon345.0
+dtype: float64
+
+In the previous example, the resulting value for duck is missing,
+because the maximum of a NaN and a float is a NaN.
+So, in the example, we set ``fill_value=0``,
+so the maximum value returned will be the value from some dataset.
+
+>>> s1.combine(s2, max, fill_value=0, return_type="double")
+duck   30.0
+eagle 200.0
+falcon345.0
+dtype: float64
+>>> reset_option("compute.ops_on_diff_frames")
+"""
+if not isinstance(other, Series) and not np.isscalar(other):
+raise TypeError("unsupported type: %s" % type(other))
+
+if np.isscalar(other):
+combined = DataFrame(
+InternalFrame(
+
spark_frame=self._internal.spark_frame.withColumn("__other", SF.lit(other)),

Review comment:
   thank you, updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org