[ 
https://issues.apache.org/jira/browse/SPARK-55933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eyck Troschke updated SPARK-55933:
----------------------------------
    Attachment: test_merge_optional_nested_struct.py

> PySpark: Incorrect Handling of None for Nested Structs in merge/mergeInto 
> with Mismatched Field Order
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-55933
>                 URL: https://issues.apache.org/jira/browse/SPARK-55933
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 4.0.2
>            Reporter: Eyck Troschke
>            Priority: Major
>         Attachments: test_merge_optional_nested_struct.py
>
>
> {{DeltaTable.merge}} and PySpark {{mergeInto}} incorrectly handle assignments 
> of {{None}} to nested struct columns when the field order in the incoming 
> DataFrame differs from the field order in the target table schema. Instead of 
> writing {{NULL}} for the entire struct, Delta Lake writes a struct with all 
> fields set to {{{}NULL{}}}. This issue does *not* occur when the struct 
> contains non‑null values.
> h2. *Details*
> Merging data into a Delta table fails to correctly handle {{None}} values for 
> nested struct columns when the struct field order in the source DataFrame 
> does not match the field order defined in the Delta table schema.
> Two independent test cases—one using {{DeltaTable.merge}} and one using 
> PySpark’s native {{{}mergeInto{}}}—demonstrate the same incorrect behavior:
>  * When the struct contains actual values, the merge correctly maps fields by 
> name, even if the order differs.
>  * When the struct value is {{{}None{}}}, the merge incorrectly inserts a 
> struct with all fields set to {{NULL}} instead of inserting a {{NULL}} value 
> for the entire struct column.
> This leads to inconsistent and incorrect data representation for optional 
> nested structs.
> h2. *Expected Behavior*
> When merging a row where a nested struct column is {{{}None{}}}, the Delta 
> table should store {{NULL}} for the entire struct—regardless of field order 
> differences between the DataFrame schema and the table schema.
> h2. *Actual Behavior*
> When merging a row with {{{}nested = None{}}}, the resulting table contains:
> {{Row(nested=Row(b=None, a=None))}}
> instead of:
> {{Row(nested=None)}}
> This occurs only when the DataFrame’s struct field order differs from the 
> table schema. For non‑null struct values, the merge behaves correctly.
> h2. *Reproduction*
>  
> {code:java}
> from collections.abc import Iteratorimport pytest
> from delta.tables import DeltaTable
> from pyspark.sql import Row, SparkSession
> from pyspark.sql.functions import lit
> from pyspark.sql.types import StringType, StructField, StructType
> @pytest.fixture
> def table_name(spark: SparkSession) -> Iterator[str]:
>     table_name = "test_optional_struct"
>     column = StructField(
>         "nested",
>         StructType(
>             [
>                 StructField("b", StringType()),
>                 StructField("a", StringType()),
>             ]
>         ),
>     )
>     
> DeltaTable.create(spark).tableName(table_name).addColumns([column]).execute() 
>    
>     yield table_name    
>     spark.sql(f"DROP TABLE {table_name}")
> @pytest.mark.parametrize(
>     "row, expected_row",
>     [
>         (Row(nested=Row(a="value_a", b="value_b")), 
> Row(nested=Row(b="value_b", a="value_a"))),
>         (Row(nested=None), Row(nested=None)),
>     ],
> )
> def test_delta_table_merge_with_optional_nested_struct(
>     spark: SparkSession, table_name: str, row: Row, expected_row: Row
> ):
>     # different order of fields in struct than in table schema
>     df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")  
>   
>     target_table = DeltaTable.forName(spark, table_name)    
>     target_table.merge(df, "true").whenNotMatchedInsertAll().execute()    
>     
>     print(f"{df.collect()=}")
>     # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
>     print(f"{target_table.toDF().collect()=}")    
>     
>     assert target_table.toDF().collect()[0] == expected_row
> @pytest.mark.parametrize(
>     "row, expected_row",
>     [
>         (Row(nested=Row(a="value_a", b="value_b")), 
> Row(nested=Row(b="value_b", a="value_a"))),
>         (Row(nested=None), Row(nested=None)),
>     ],
> )
> def test_pyspark_merge_with_optional_nested_struct(
>     spark: SparkSession, table_name: str, row: Row, expected_row: Row
> ):
>     df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")  
>   
>     df.mergeInto(table_name, lit(True)).whenNotMatched().insertAll().merge()  
>   
>     print(f"{df.collect()=}")
>     # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
>     print(f"{spark.table(table_name).collect()=}")    
>     
>     assert spark.table(table_name).collect()[0] == expected_row
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to