[ 
https://issues.apache.org/jira/browse/SPARK-35745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-35745.
----------------------------------
    Resolution: Not A Problem

> Serie to Scalar pandas_udf in GroupedData.agg() breaks the following 
> monotonically_increasing_id()
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-35745
>                 URL: https://issues.apache.org/jira/browse/SPARK-35745
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.1.1, 3.1.2
>         Environment: I was able to reproduce this with both
> pyspark == ' 3.1.1'
>  pyarrow == '3.0.0'
> Python 3.7.10
> and
> pyspark == '3.1.2'
>  pyarrow == '4.0.1'
> Python 3.7.9
>            Reporter: Hadrien Glaude
>            Priority: Major
>
> Hello,
> I encountered an issue when using a Serie to Scalar `{{panda_udf}}` in 
> `{{GroupedData.agg()}}` followed by `{{monotonically_increasing_id()}}`. I 
> obtain duplicated ids. Actually, the partition offset in the id seems to be 
> zero on all partitions. The problem is avoided by using 
> `{{asNondeterministic}}`.
> Minimal reproducing example
> {code:java}
> from pyspark.sql import SparkSession
> import pyspark.sql.functions as F
> from pyspark.sql.functions import pandas_udf
> import pandas as pd
> from pyspark.sql.types import IntegerType
> spark = SparkSession.builder\
> .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
> .config("spark.sql.shuffle.partitions", "8")\
> .master("local[4]").getOrCreate()
> @pandas_udf(IntegerType())
> def sum_pandas(vals: pd.Series) -> int:
>     return int(vals.to_numpy().sum())
> @pandas_udf(IntegerType())
> def sum_pandas2(vals: pd.Series) -> int: 
>     return int(vals.to_numpy().sum())
> sum_pandas2 = sum_pandas2.asNondeterministic()
> l = [(i%100,i) for i in range(2000)]
> data = spark.createDataFrame(l, schema=["col1","col2"])
> data.groupby("col1").agg(sum_pandas("col2").alias("sum"))\
> .withColumn("group_id", F.monotonically_increasing_id()).show()
> data = spark.createDataFrame(l, schema=["col1","col2"])
> data.groupby("col1").agg(sum_pandas2("col2").alias("sum"))\
> .withColumn("group_id", F.monotonically_increasing_id()).show(){code}
> Output
> {code:java}
> +----+-----+--------+
> |col1|  sum|group_id|
> +----+-----+--------+
> |   2|19040|       0|
> |  12|19240|       1|
> |  26|19520|       2|
> |  28|19560|       3|
> |  29|19580|       4|
> |  30|19600|       5|
> |  33|19660|       6|
> |  42|19840|       7|
> |  48|19960|       8|
> |  67|20340|       9|
> |  73|20460|      10|
> |  88|20760|      11|
> |  91|20820|      12|
> |  93|20860|      13|
> |   9|19180|       0|
> |  11|19220|       1|
> |  22|19440|       2|
> |  32|19640|       3|
> |  36|19720|       4|
> |  40|19800|       5|
> +----+-----+--------+
> only showing top 20 rows
> +----+-----+----------+
> |col1|  sum|  group_id|
> +----+-----+----------+
> |   2|19040|         0|
> |  12|19240|         1|
> |  26|19520|         2|
> |  28|19560|         3|
> |  29|19580|         4|
> |  30|19600|         5|
> |  33|19660|         6|
> |  42|19840|         7|
> |  48|19960|         8|
> |  67|20340|         9|
> |  73|20460|        10|
> |  88|20760|        11|
> |  91|20820|        12|
> |  93|20860|        13|
> |   9|19180|8589934592|
> |  11|19220|8589934593|
> |  22|19440|8589934594|
> |  32|19640|8589934595|
> |  36|19720|8589934596|
> |  40|19800|8589934597|
> +----+-----+----------+
> only showing top 20 rows
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to