[
https://issues.apache.org/jira/browse/SPARK-56322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Takuya Ueshin reassigned SPARK-56322:
-------------------------------------
Assignee: Marcin Wojtyczka
> [CONNECT][PYTHON] Self-joining an observed DataFrame raises TypeError in
> observations property
> ----------------------------------------------------------------------------------------------
>
> Key: SPARK-56322
> URL: https://issues.apache.org/jira/browse/SPARK-56322
> Project: Spark
> Issue Type: Bug
> Components: Connect, PySpark
> Affects Versions: 3.4.0, 3.5.0, 4.0.0
> Reporter: Marcin Wojtyczka
> Assignee: Marcin Wojtyczka
> Priority: Major
> Labels: pull-request-available
>
> When a DataFrame is observed using DataFrame.observe() and subsequently
> filtered into multiple subsets that are then joined back together, the
> observations property on the Connect logical plan raises a TypeError.
>
>
>
> *Root cause:*
> The observations property in several plan nodes (Join, AsOfJoin, LateralJoin,
> SetOperation, CollectMetrics) merges observations from child plan
> branches using dict(**left, **right). When an observed DataFrame is
> branched (filtered, aliased) and
> then self-joined, both branches carry a reference to the same Observation
> instance
> under the same name. Python's dict() constructor does not allow duplicate
> keyword
> arguments and raises:
> {code:java}
> TypeError: dict() got multiple values for keyword argument
> '<observation_name>'{code}
>
> This affects any workflow that observes a DataFrame and then branches and
> merges it — a common pattern in data quality pipelines, ETL workflows with
> valid/invalid splits, and any pipeline that filters an observed DataFrame
> into subsets and joins them. See DQX issue:
> [https://github.com/databrickslabs/dqx/issues/1099]
>
> *Reproduction:*
> {code:java}
> from pyspark.sql import Observation
> from pyspark.sql.functions import count, lit
> obs = Observation("my_observation")df = (
> spark.range(100)
> .selectExpr("id", "case when id < 10 then 'A' else 'B' end as
> group_key")
> .observe(obs, count(lit(1)).alias("row_count"))
> )
> # Filter into two subsets — both carry the same observation
> df1 = df.where("id < 20")df2 = df.where("id % 2 == 0")
> # Self-join triggers the bug
> joined = df1.alias("a").join(df2.alias("b"), on=["id"],
> how="inner")joined.collect()
>
> # TypeError: dict() got multiple values for keyword argument
> 'my_observation' {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]