[ 
https://issues.apache.org/jira/browse/SPARK-48311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumit Singh updated SPARK-48311:
--------------------------------
    Description: 
Steps to Reproduce 

1. Data creation
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, TimestampType, 
StringType
from datetime import datetime

# Define the schema
schema = StructType([
    StructField("col1", LongType(), nullable=True),
    StructField("col2", TimestampType(), nullable=True),
    StructField("col3", StringType(), nullable=True)
])

# Define the data
data = [
    (1, datetime(2023, 5, 15, 12, 30), "Discount"),
    (2, datetime(2023, 5, 16, 16, 45), "Promotion"),
    (3, datetime(2023, 5, 17, 9, 15), "Coupon")
]

# Create the DataFrame
df = spark.createDataFrame(data, 
schema)df.createOrReplaceTempView("temp_offers")

# Query the temporary table using SQL
# DISTINCT required to reproduce the issue. 
testDf = spark.sql("""
                    SELECT DISTINCT 
                    col1,
                    col2,
                    col3 FROM temp_offers
                    """) {code}
2. UDF registration 
{code:java}
import pyspark.sql.functions as F 
import pyspark.sql.types as T

#Creating udf functions 
def udf1(d):
    return d

def udf2(d):
    if d.isoweekday() in (1, 2, 3, 4):
        return 'WEEKDAY'
    else:
        return 'WEEKEND'

udf1_name = F.udf(udf1, T.TimestampType())
udf2_name = F.udf(udf2, T.StringType()) {code}
3. Adding UDF in grouping and agg
{code:java}
groupBy_cols = ['col1', 'col4', 'col5', 'col3']
temp = testDf \
  .select('*', udf1_name(F.col('col2')).alias('col4')).select('*', 
udf2_name('col4').alias('col5')) 

result = 
(temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code}
4. Result
{code:java}
result.show(5, False) {code}
*We get below error*
{code:java}
An error was encountered:
An error occurred while calling o1079.showString.
: java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in 
[col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L]
        at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
        at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
 {code}

  was:
Steps to Reproduce 

1. Data creation
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, TimestampType, 
StringType
from datetime import datetime

# Define the schema
schema = StructType([
    StructField("col1", LongType(), nullable=True),
    StructField("col2", TimestampType(), nullable=True),
    StructField("col3", StringType(), nullable=True)
])

# Define the data
data = [
    (1, datetime(2023, 5, 15, 12, 30), "Discount"),
    (2, datetime(2023, 5, 16, 16, 45), "Promotion"),
    (3, datetime(2023, 5, 17, 9, 15), "Coupon")
]

# Create the DataFrame
df = spark.createDataFrame(data, 
schema)df.createOrReplaceTempView("temp_offers")

# Query the temporary table using SQL
# DISTINCT required to reproduce the issue. 
testDf = spark.sql("""
                    SELECT DISTINCT 
                    col1,
                    col2,
                    col3 FROM temp_offers
                    """) {code}
2. UDF registration 
{code:java}
import pyspark.sql.functions as F 
import pyspark.sql.types as T

#Creating udf functions 
def udf1(incentive_date):
    return incentive_date

def udf2(d):
    if d.isoweekday() in (1, 2, 3, 4):
        return 'WEEKDAY'
    else:
        return 'WEEKEND'

udf1_name = F.udf(udf1, T.TimestampType())
udf2_name = F.udf(udf2, T.StringType()) {code}
3. Adding UDF in grouping and agg
{code:java}
groupBy_cols = ['col1', 'col4', 'col5', 'col3']
temp = testDf \
  .select('*', udf1_name(F.col('col2')).alias('col4')).select('*', 
udf2_name('col4').alias('col5')) 

result = 
(temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code}
4. Result
{code:java}
result.show(5, False) {code}
*We get below error*
{code:java}
An error was encountered:
An error occurred while calling o1079.showString.
: java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in 
[col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L]
        at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
        at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
 {code}


> Nested pythonUDF in groupBy and aggregate result in Binding Exception 
> ----------------------------------------------------------------------
>
>                 Key: SPARK-48311
>                 URL: https://issues.apache.org/jira/browse/SPARK-48311
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.3.2
>            Reporter: Sumit Singh
>            Priority: Major
>
> Steps to Reproduce 
> 1. Data creation
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, LongType, 
> TimestampType, StringType
> from datetime import datetime
> # Define the schema
> schema = StructType([
>     StructField("col1", LongType(), nullable=True),
>     StructField("col2", TimestampType(), nullable=True),
>     StructField("col3", StringType(), nullable=True)
> ])
> # Define the data
> data = [
>     (1, datetime(2023, 5, 15, 12, 30), "Discount"),
>     (2, datetime(2023, 5, 16, 16, 45), "Promotion"),
>     (3, datetime(2023, 5, 17, 9, 15), "Coupon")
> ]
> # Create the DataFrame
> df = spark.createDataFrame(data, 
> schema)df.createOrReplaceTempView("temp_offers")
> # Query the temporary table using SQL
> # DISTINCT required to reproduce the issue. 
> testDf = spark.sql("""
>                     SELECT DISTINCT 
>                     col1,
>                     col2,
>                     col3 FROM temp_offers
>                     """) {code}
> 2. UDF registration 
> {code:java}
> import pyspark.sql.functions as F 
> import pyspark.sql.types as T
> #Creating udf functions 
> def udf1(d):
>     return d
> def udf2(d):
>     if d.isoweekday() in (1, 2, 3, 4):
>         return 'WEEKDAY'
>     else:
>         return 'WEEKEND'
> udf1_name = F.udf(udf1, T.TimestampType())
> udf2_name = F.udf(udf2, T.StringType()) {code}
> 3. Adding UDF in grouping and agg
> {code:java}
> groupBy_cols = ['col1', 'col4', 'col5', 'col3']
> temp = testDf \
>   .select('*', udf1_name(F.col('col2')).alias('col4')).select('*', 
> udf2_name('col4').alias('col5')) 
> result = 
> (temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code}
> 4. Result
> {code:java}
> result.show(5, False) {code}
> *We get below error*
> {code:java}
> An error was encountered:
> An error occurred while calling o1079.showString.
> : java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in 
> [col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L]
>       at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
>       at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to