[ https://issues.apache.org/jira/browse/SPARK-48311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sumit Singh updated SPARK-48311: -------------------------------- Description: Steps to Reproduce 1. Data creation {code:java} from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, LongType, TimestampType, StringType from datetime import datetime # Define the schema schema = StructType([ StructField("col1", LongType(), nullable=True), StructField("col2", TimestampType(), nullable=True), StructField("col3", StringType(), nullable=True) ]) # Define the data data = [ (1, datetime(2023, 5, 15, 12, 30), "Discount"), (2, datetime(2023, 5, 16, 16, 45), "Promotion"), (3, datetime(2023, 5, 17, 9, 15), "Coupon") ] # Create the DataFrame df = spark.createDataFrame(data, schema)df.createOrReplaceTempView("temp_offers") # Query the temporary table using SQL # DISTINCT required to reproduce the issue. testDf = spark.sql(""" SELECT DISTINCT col1, col2, col3 FROM temp_offers """) {code} 2. UDF registration {code:java} import pyspark.sql.functions as F import pyspark.sql.types as T #Creating udf functions def udf1(d): return d def udf2(d): if d.isoweekday() in (1, 2, 3, 4): return 'WEEKDAY' else: return 'WEEKEND' udf1_name = F.udf(udf1, T.TimestampType()) udf2_name = F.udf(udf2, T.StringType()) {code} 3. Adding UDF in grouping and agg {code:java} groupBy_cols = ['col1', 'col4', 'col5', 'col3'] temp = testDf \ .select('*', udf1_name(F.col('col2')).alias('col4')).select('*', udf2_name('col4').alias('col5')) result = (temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code} 4. Result {code:java} result.show(5, False) {code} *We get below error* {code:java} An error was encountered: An error occurred while calling o1079.showString. : java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in [col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L] at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73) {code} was: Steps to Reproduce 1. Data creation {code:java} from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, LongType, TimestampType, StringType from datetime import datetime # Define the schema schema = StructType([ StructField("col1", LongType(), nullable=True), StructField("col2", TimestampType(), nullable=True), StructField("col3", StringType(), nullable=True) ]) # Define the data data = [ (1, datetime(2023, 5, 15, 12, 30), "Discount"), (2, datetime(2023, 5, 16, 16, 45), "Promotion"), (3, datetime(2023, 5, 17, 9, 15), "Coupon") ] # Create the DataFrame df = spark.createDataFrame(data, schema)df.createOrReplaceTempView("temp_offers") # Query the temporary table using SQL # DISTINCT required to reproduce the issue. testDf = spark.sql(""" SELECT DISTINCT col1, col2, col3 FROM temp_offers """) {code} 2. UDF registration {code:java} import pyspark.sql.functions as F import pyspark.sql.types as T #Creating udf functions def udf1(incentive_date): return incentive_date def udf2(d): if d.isoweekday() in (1, 2, 3, 4): return 'WEEKDAY' else: return 'WEEKEND' udf1_name = F.udf(udf1, T.TimestampType()) udf2_name = F.udf(udf2, T.StringType()) {code} 3. Adding UDF in grouping and agg {code:java} groupBy_cols = ['col1', 'col4', 'col5', 'col3'] temp = testDf \ .select('*', udf1_name(F.col('col2')).alias('col4')).select('*', udf2_name('col4').alias('col5')) result = (temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code} 4. Result {code:java} result.show(5, False) {code} *We get below error* {code:java} An error was encountered: An error occurred while calling o1079.showString. : java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in [col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L] at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80) at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73) {code} > Nested pythonUDF in groupBy and aggregate result in Binding Exception > ---------------------------------------------------------------------- > > Key: SPARK-48311 > URL: https://issues.apache.org/jira/browse/SPARK-48311 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.3.2 > Reporter: Sumit Singh > Priority: Major > > Steps to Reproduce > 1. Data creation > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql.types import StructType, StructField, LongType, > TimestampType, StringType > from datetime import datetime > # Define the schema > schema = StructType([ > StructField("col1", LongType(), nullable=True), > StructField("col2", TimestampType(), nullable=True), > StructField("col3", StringType(), nullable=True) > ]) > # Define the data > data = [ > (1, datetime(2023, 5, 15, 12, 30), "Discount"), > (2, datetime(2023, 5, 16, 16, 45), "Promotion"), > (3, datetime(2023, 5, 17, 9, 15), "Coupon") > ] > # Create the DataFrame > df = spark.createDataFrame(data, > schema)df.createOrReplaceTempView("temp_offers") > # Query the temporary table using SQL > # DISTINCT required to reproduce the issue. > testDf = spark.sql(""" > SELECT DISTINCT > col1, > col2, > col3 FROM temp_offers > """) {code} > 2. UDF registration > {code:java} > import pyspark.sql.functions as F > import pyspark.sql.types as T > #Creating udf functions > def udf1(d): > return d > def udf2(d): > if d.isoweekday() in (1, 2, 3, 4): > return 'WEEKDAY' > else: > return 'WEEKEND' > udf1_name = F.udf(udf1, T.TimestampType()) > udf2_name = F.udf(udf2, T.StringType()) {code} > 3. Adding UDF in grouping and agg > {code:java} > groupBy_cols = ['col1', 'col4', 'col5', 'col3'] > temp = testDf \ > .select('*', udf1_name(F.col('col2')).alias('col4')).select('*', > udf2_name('col4').alias('col5')) > result = > (temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code} > 4. Result > {code:java} > result.show(5, False) {code} > *We get below error* > {code:java} > An error was encountered: > An error occurred while calling o1079.showString. > : java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in > [col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L] > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org