Sumit Singh created SPARK-48311:
-----------------------------------

             Summary: Nested pythonUDF in groupBy and aggregate result in 
Binding Exception 
                 Key: SPARK-48311
                 URL: https://issues.apache.org/jira/browse/SPARK-48311
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.3.2
            Reporter: Sumit Singh


Steps to Reproduce 
 # Data creation
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, TimestampType, 
StringType
from datetime import datetime

# Define the schema
schema = StructType([
    StructField("col1", LongType(), nullable=True),
    StructField("col2", TimestampType(), nullable=True),
    StructField("col3", StringType(), nullable=True)
])

# Define the data
data = [
    (1, datetime(2023, 5, 15, 12, 30), "Discount"),
    (2, datetime(2023, 5, 16, 16, 45), "Promotion"),
    (3, datetime(2023, 5, 17, 9, 15), "Coupon")
]

# Create the DataFrame
df = spark.createDataFrame(data, 
schema)df.createOrReplaceTempView("temp_offers")

# Query the temporary table using SQL
# DISTINCT required to reproduce the issue. 
testDf = spark.sql("""
                    SELECT DISTINCT 
                    col1,
                    col2,
                    col3 FROM temp_offers
                    """) {code}

 # UDF registration 

{code:java}
import pyspark.sql.functions as F 
import pyspark.sql.types as T

#Creating udf functions 
def udf1(incentive_date):
    return incentive_date

def udf2(d):
    if d.isoweekday() in (1, 2, 3, 4):
        return 'WEEKDAY'
    else:
        return 'WEEKEND'

udf1_name = F.udf(udf1, T.TimestampType())
udf2_name = F.udf(udf2, T.StringType()) {code}

 # Adding UDF in grouping and agg

{code:java}
groupBy_cols = ['col1', 'col4', 'col5', 'col3']
temp = testDf \
  .select('*', udf1_name(F.col('col2')).alias('col4')).select('*', 
udf2_name('col4').alias('col5')) 

result = 
(temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code}

 # Result

{code:java}
result.show(5, False) {code}
*We get below error*

{code:java}
An error was encountered:
An error occurred while calling o1079.showString.
: java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in 
[col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L]
        at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
        at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to