[ 
https://issues.apache.org/jira/browse/SPARK-56322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Takuya Ueshin reassigned SPARK-56322:
-------------------------------------

    Assignee: Marcin Wojtyczka

> [CONNECT][PYTHON] Self-joining an observed DataFrame raises TypeError in 
> observations property
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-56322
>                 URL: https://issues.apache.org/jira/browse/SPARK-56322
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect, PySpark
>    Affects Versions: 3.4.0, 3.5.0, 4.0.0
>            Reporter: Marcin Wojtyczka
>            Assignee: Marcin Wojtyczka
>            Priority: Major
>              Labels: pull-request-available
>
> When a DataFrame is observed using DataFrame.observe() and subsequently 
> filtered into multiple subsets that are then joined back together, the 
> observations property on the Connect logical plan raises a TypeError.         
>                                     
>                                                                               
>           
> *Root cause:*                      
> The observations property in several plan nodes (Join, AsOfJoin, LateralJoin, 
>      SetOperation, CollectMetrics) merges observations from child plan 
> branches using      dict(**left, **right). When an observed DataFrame is 
> branched (filtered, aliased) and 
>   then self-joined, both branches carry a reference to the same Observation 
> instance
>   under the same name. Python's dict() constructor does not allow duplicate 
> keyword
>   arguments and raises:
> {code:java}
> TypeError: dict() got multiple values for keyword argument 
> '<observation_name>'{code}
>  
> This affects any workflow that observes a DataFrame and then branches and 
> merges it —  a common pattern in data quality pipelines, ETL workflows with 
> valid/invalid splits,  and any pipeline that filters an observed DataFrame 
> into subsets and joins them.  See DQX issue: 
> [https://github.com/databrickslabs/dqx/issues/1099]
>                              
> *Reproduction:*
> {code:java}
> from pyspark.sql import Observation
> from pyspark.sql.functions import count, lit
> obs = Observation("my_observation")df = (    
>    spark.range(100)
>       .selectExpr("id", "case when id < 10 then 'A' else 'B' end as 
> group_key")     
>       .observe(obs, count(lit(1)).alias("row_count"))
> )
> # Filter into two subsets — both carry the same observation                 
> df1 = df.where("id < 20")df2 = df.where("id % 2 == 0") 
> # Self-join triggers the bug                                                 
> joined = df1.alias("a").join(df2.alias("b"), on=["id"], 
> how="inner")joined.collect()
>                                                                 
>   # TypeError: dict() got multiple values for keyword argument 
> 'my_observation'    {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to