Sumit Singh created SPARK-48311: ----------------------------------- Summary: Nested pythonUDF in groupBy and aggregate result in Binding Exception Key: SPARK-48311 URL: https://issues.apache.org/jira/browse/SPARK-48311 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.3.2 Reporter: Sumit Singh
Steps to Reproduce # Data creation {code:java} from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, LongType, TimestampType, StringType from datetime import datetime # Define the schema schema = StructType([ StructField("col1", LongType(), nullable=True), StructField("col2", TimestampType(), nullable=True), StructField("col3", StringType(), nullable=True) ]) # Define the data data = [ (1, datetime(2023, 5, 15, 12, 30), "Discount"), (2, datetime(2023, 5, 16, 16, 45), "Promotion"), (3, datetime(2023, 5, 17, 9, 15), "Coupon") ] # Create the DataFrame df = spark.createDataFrame(data, schema)df.createOrReplaceTempView("temp_offers") # Query the temporary table using SQL # DISTINCT required to reproduce the issue. testDf = spark.sql(""" SELECT DISTINCT col1, col2, col3 FROM temp_offers """) {code} # UDF registration {code:java} import pyspark.sql.functions as F import pyspark.sql.types as T #Creating udf functions def udf1(incentive_date): return incentive_date def udf2(d): if d.isoweekday() in (1, 2, 3, 4): return 'WEEKDAY' else: return 'WEEKEND' udf1_name = F.udf(udf1, T.TimestampType()) udf2_name = F.udf(udf2, T.StringType()) {code} # Adding UDF in grouping and agg {code:java} groupBy_cols = ['col1', 'col4', 'col5', 'col3'] temp = testDf \ .select('*', udf1_name(F.col('col2')).alias('col4')).select('*', udf2_name('col4').alias('col5')) result = (temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code} # Result {code:java} result.show(5, False) {code} *We get below error* {code:java} An error was encountered: An error occurred while calling o1079.showString. : java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in [col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L] at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org