[
https://issues.apache.org/jira/browse/SPARK-55933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eyck Troschke updated SPARK-55933:
----------------------------------
Attachment: test_merge_optional_nested_struct.py
> PySpark: Incorrect Handling of None for Nested Structs in merge/mergeInto
> with Mismatched Field Order
> -----------------------------------------------------------------------------------------------------
>
> Key: SPARK-55933
> URL: https://issues.apache.org/jira/browse/SPARK-55933
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 4.0.2
> Reporter: Eyck Troschke
> Priority: Major
> Attachments: test_merge_optional_nested_struct.py
>
>
> {{DeltaTable.merge}} and PySpark {{mergeInto}} incorrectly handle assignments
> of {{None}} to nested struct columns when the field order in the incoming
> DataFrame differs from the field order in the target table schema. Instead of
> writing {{NULL}} for the entire struct, Delta Lake writes a struct with all
> fields set to {{{}NULL{}}}. This issue does *not* occur when the struct
> contains non‑null values.
> h2. *Details*
> Merging data into a Delta table fails to correctly handle {{None}} values for
> nested struct columns when the struct field order in the source DataFrame
> does not match the field order defined in the Delta table schema.
> Two independent test cases—one using {{DeltaTable.merge}} and one using
> PySpark’s native {{{}mergeInto{}}}—demonstrate the same incorrect behavior:
> * When the struct contains actual values, the merge correctly maps fields by
> name, even if the order differs.
> * When the struct value is {{{}None{}}}, the merge incorrectly inserts a
> struct with all fields set to {{NULL}} instead of inserting a {{NULL}} value
> for the entire struct column.
> This leads to inconsistent and incorrect data representation for optional
> nested structs.
> h2. *Expected Behavior*
> When merging a row where a nested struct column is {{{}None{}}}, the Delta
> table should store {{NULL}} for the entire struct—regardless of field order
> differences between the DataFrame schema and the table schema.
> h2. *Actual Behavior*
> When merging a row with {{{}nested = None{}}}, the resulting table contains:
> {{Row(nested=Row(b=None, a=None))}}
> instead of:
> {{Row(nested=None)}}
> This occurs only when the DataFrame’s struct field order differs from the
> table schema. For non‑null struct values, the merge behaves correctly.
> h2. *Reproduction*
>
> {code:java}
> from collections.abc import Iteratorimport pytest
> from delta.tables import DeltaTable
> from pyspark.sql import Row, SparkSession
> from pyspark.sql.functions import lit
> from pyspark.sql.types import StringType, StructField, StructType
> @pytest.fixture
> def table_name(spark: SparkSession) -> Iterator[str]:
> table_name = "test_optional_struct"
> column = StructField(
> "nested",
> StructType(
> [
> StructField("b", StringType()),
> StructField("a", StringType()),
> ]
> ),
> )
>
> DeltaTable.create(spark).tableName(table_name).addColumns([column]).execute()
>
> yield table_name
> spark.sql(f"DROP TABLE {table_name}")
> @pytest.mark.parametrize(
> "row, expected_row",
> [
> (Row(nested=Row(a="value_a", b="value_b")),
> Row(nested=Row(b="value_b", a="value_a"))),
> (Row(nested=None), Row(nested=None)),
> ],
> )
> def test_delta_table_merge_with_optional_nested_struct(
> spark: SparkSession, table_name: str, row: Row, expected_row: Row
> ):
> # different order of fields in struct than in table schema
> df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")
>
> target_table = DeltaTable.forName(spark, table_name)
> target_table.merge(df, "true").whenNotMatchedInsertAll().execute()
>
> print(f"{df.collect()=}")
> # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
> print(f"{target_table.toDF().collect()=}")
>
> assert target_table.toDF().collect()[0] == expected_row
> @pytest.mark.parametrize(
> "row, expected_row",
> [
> (Row(nested=Row(a="value_a", b="value_b")),
> Row(nested=Row(b="value_b", a="value_a"))),
> (Row(nested=None), Row(nested=None)),
> ],
> )
> def test_pyspark_merge_with_optional_nested_struct(
> spark: SparkSession, table_name: str, row: Row, expected_row: Row
> ):
> df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")
>
> df.mergeInto(table_name, lit(True)).whenNotMatched().insertAll().merge()
>
> print(f"{df.collect()=}")
> # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
> print(f"{spark.table(table_name).collect()=}")
>
> assert spark.table(table_name).collect()[0] == expected_row
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]