[ https://issues.apache.org/jira/browse/SPARK-35745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-35745. ---------------------------------- Resolution: Not A Problem > Serie to Scalar pandas_udf in GroupedData.agg() breaks the following > monotonically_increasing_id() > -------------------------------------------------------------------------------------------------- > > Key: SPARK-35745 > URL: https://issues.apache.org/jira/browse/SPARK-35745 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.1.1, 3.1.2 > Environment: I was able to reproduce this with both > pyspark == ' 3.1.1' > pyarrow == '3.0.0' > Python 3.7.10 > and > pyspark == '3.1.2' > pyarrow == '4.0.1' > Python 3.7.9 > Reporter: Hadrien Glaude > Priority: Major > > Hello, > I encountered an issue when using a Serie to Scalar `{{panda_udf}}` in > `{{GroupedData.agg()}}` followed by `{{monotonically_increasing_id()}}`. I > obtain duplicated ids. Actually, the partition offset in the id seems to be > zero on all partitions. The problem is avoided by using > `{{asNondeterministic}}`. > Minimal reproducing example > {code:java} > from pyspark.sql import SparkSession > import pyspark.sql.functions as F > from pyspark.sql.functions import pandas_udf > import pandas as pd > from pyspark.sql.types import IntegerType > spark = SparkSession.builder\ > .config("spark.sql.execution.arrow.pyspark.enabled", "true")\ > .config("spark.sql.shuffle.partitions", "8")\ > .master("local[4]").getOrCreate() > @pandas_udf(IntegerType()) > def sum_pandas(vals: pd.Series) -> int: > return int(vals.to_numpy().sum()) > @pandas_udf(IntegerType()) > def sum_pandas2(vals: pd.Series) -> int: > return int(vals.to_numpy().sum()) > sum_pandas2 = sum_pandas2.asNondeterministic() > l = [(i%100,i) for i in range(2000)] > data = spark.createDataFrame(l, schema=["col1","col2"]) > data.groupby("col1").agg(sum_pandas("col2").alias("sum"))\ > .withColumn("group_id", F.monotonically_increasing_id()).show() > data = spark.createDataFrame(l, schema=["col1","col2"]) > data.groupby("col1").agg(sum_pandas2("col2").alias("sum"))\ > .withColumn("group_id", F.monotonically_increasing_id()).show(){code} > Output > {code:java} > +----+-----+--------+ > |col1| sum|group_id| > +----+-----+--------+ > | 2|19040| 0| > | 12|19240| 1| > | 26|19520| 2| > | 28|19560| 3| > | 29|19580| 4| > | 30|19600| 5| > | 33|19660| 6| > | 42|19840| 7| > | 48|19960| 8| > | 67|20340| 9| > | 73|20460| 10| > | 88|20760| 11| > | 91|20820| 12| > | 93|20860| 13| > | 9|19180| 0| > | 11|19220| 1| > | 22|19440| 2| > | 32|19640| 3| > | 36|19720| 4| > | 40|19800| 5| > +----+-----+--------+ > only showing top 20 rows > +----+-----+----------+ > |col1| sum| group_id| > +----+-----+----------+ > | 2|19040| 0| > | 12|19240| 1| > | 26|19520| 2| > | 28|19560| 3| > | 29|19580| 4| > | 30|19600| 5| > | 33|19660| 6| > | 42|19840| 7| > | 48|19960| 8| > | 67|20340| 9| > | 73|20460| 10| > | 88|20760| 11| > | 91|20820| 12| > | 93|20860| 13| > | 9|19180|8589934592| > | 11|19220|8589934593| > | 22|19440|8589934594| > | 32|19640|8589934595| > | 36|19720|8589934596| > | 40|19800|8589934597| > +----+-----+----------+ > only showing top 20 rows > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org