[ 
https://issues.apache.org/jira/browse/SPARK-55933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eyck Troschke updated SPARK-55933:
----------------------------------
    Description: 
{{DeltaTable.merge}} and PySpark {{mergeInto}} incorrectly handle assignments 
of {{None}} to nested struct columns when the field order in the incoming 
DataFrame differs from the field order in the target table schema. Instead of 
writing {{NULL}} for the entire struct, Delta Lake writes a struct with all 
fields set to {{{}NULL{}}}. This issue does *not* occur when the struct 
contains non‑null values.
h2. *Details*

Merging data into a Delta table fails to correctly handle {{None}} values for 
nested struct columns when the struct field order in the source DataFrame does 
not match the field order defined in the Delta table schema.

Two independent test cases—one using {{DeltaTable.merge}} and one using 
PySpark’s native {{{}mergeInto{}}}—demonstrate the same incorrect behavior:
 * When the struct contains actual values, the merge correctly maps fields by 
name, even if the order differs.
 * When the struct value is {{{}None{}}}, the merge incorrectly inserts a 
struct with all fields set to {{NULL}} instead of inserting a {{NULL}} value 
for the entire struct column.

This leads to inconsistent and incorrect data representation for optional 
nested structs.
h2. *Expected Behavior*

When merging a row where a nested struct column is {{{}None{}}}, the Delta 
table should store {{NULL}} for the entire struct—regardless of field order 
differences between the DataFrame schema and the table schema.
h2. *Actual Behavior*

When merging a row with {{{}nested = None{}}}, the resulting table contains:

{{Row(nested=Row(b=None, a=None))}}

instead of:

{{Row(nested=None)}}

This occurs only when the DataFrame’s struct field order differs from the table 
schema. For non‑null struct values, the merge behaves correctly.
h2. *Reproduction*

 
{code:java}
from collections.abc import Iteratorimport pytest
from delta.tables import DeltaTable
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, StructField, StructType

@pytest.fixture
def table_name(spark: SparkSession) -> Iterator[str]:
    table_name = "test_optional_struct"
    column = StructField(
        "nested",
        StructType(
            [
                StructField("b", StringType()),
                StructField("a", StringType()),
            ]
        ),
    )
    
DeltaTable.create(spark).tableName(table_name).addColumns([column]).execute()   
 
    yield table_name    
    spark.sql(f"DROP TABLE {table_name}")


@pytest.mark.parametrize(
    "row, expected_row",
    [
        (Row(nested=Row(a="value_a", b="value_b")), Row(nested=Row(b="value_b", 
a="value_a"))),
        (Row(nested=None), Row(nested=None)),
    ],
)
def test_delta_table_merge_with_optional_nested_struct(
    spark: SparkSession, table_name: str, row: Row, expected_row: Row
):
    # different order of fields in struct than in table schema
    df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")    
    target_table = DeltaTable.forName(spark, table_name)    
    target_table.merge(df, "true").whenNotMatchedInsertAll().execute()    
    
    print(f"{df.collect()=}")
    # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
    print(f"{target_table.toDF().collect()=}")    
    
    assert target_table.toDF().collect()[0] == expected_row


@pytest.mark.parametrize(
    "row, expected_row",
    [
        (Row(nested=Row(a="value_a", b="value_b")), Row(nested=Row(b="value_b", 
a="value_a"))),
        (Row(nested=None), Row(nested=None)),
    ],
)
def test_pyspark_merge_with_optional_nested_struct(
    spark: SparkSession, table_name: str, row: Row, expected_row: Row
):
    df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")    
    df.mergeInto(table_name, lit(True)).whenNotMatched().insertAll().merge()    

    print(f"{df.collect()=}")
    # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
    print(f"{spark.table(table_name).collect()=}")    
    
    assert spark.table(table_name).collect()[0] == expected_row


{code}
 

  was:
{{DeltaTable.merge}} and PySpark {{mergeInto}} incorrectly handle assignments 
of {{None}} to nested struct columns when the field order in the incoming 
DataFrame differs from the field order in the target table schema. Instead of 
writing {{NULL}} for the entire struct, Delta Lake writes a struct with all 
fields set to {{{}NULL{}}}. This issue does *not* occur when the struct 
contains non‑null values.
h2. *Details*

Merging data into a Delta table fails to correctly handle {{None}} values for 
nested struct columns when the struct field order in the source DataFrame does 
not match the field order defined in the Delta table schema.

Two independent test cases—one using {{DeltaTable.merge}} and one using 
PySpark’s native {{{}mergeInto{}}}—demonstrate the same incorrect behavior:
 * When the struct contains actual values, the merge correctly maps fields by 
name, even if the order differs.
 * When the struct value is {{{}None{}}}, the merge incorrectly inserts a 
struct with all fields set to {{NULL}} instead of inserting a {{NULL}} value 
for the entire struct column.

This leads to inconsistent and incorrect data representation for optional 
nested structs.
h2. *Expected Behavior*

When merging a row where a nested struct column is {{{}None{}}}, the Delta 
table should store {{NULL}} for the entire struct—regardless of field order 
differences between the DataFrame schema and the table schema.
h2. *Actual Behavior*

When merging a row with {{{}nested = None{}}}, the resulting table contains:

{{Row(nested=Row(b=None, a=None))}}

instead of:

{{Row(nested=None)}}

This occurs only when the DataFrame’s struct field order differs from the table 
schema. For non‑null struct values, the merge behaves correctly.
h2. *Reproduction*

 
{code:java}
from collections.abc import Iteratorimport pytest
from delta.tables import DeltaTable
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, StructField, StructType
@pytest.fixture
def table_name(spark: SparkSession) -> Iterator[str]:
    table_name = "test_optional_struct"
    column = StructField(
        "nested",
        StructType(
            [
                StructField("b", StringType()),
                StructField("a", StringType()),
            ]
        ),
    )
    
DeltaTable.create(spark).tableName(table_name).addColumns([column]).execute()   
 yield table_name    spark.sql(f"DROP TABLE {table_name}")
@pytest.mark.parametrize(
    "row, expected_row",
    [
        (Row(nested=Row(a="value_a", b="value_b")), Row(nested=Row(b="value_b", 
a="value_a"))),
        (Row(nested=None), Row(nested=None)),
    ],
)
def test_delta_table_merge_with_optional_nested_struct(
    spark: SparkSession, table_name: str, row: Row, expected_row: Row
):
    # different order of fields in struct than in table schema
    df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")    
target_table = DeltaTable.forName(spark, table_name)    target_table.merge(df, 
"true").whenNotMatchedInsertAll().execute()    print(f"{df.collect()=}")
    # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
    print(f"{target_table.toDF().collect()=}")    assert 
target_table.toDF().collect()[0] == expected_row
@pytest.mark.parametrize(
    "row, expected_row",
    [
        (Row(nested=Row(a="value_a", b="value_b")), Row(nested=Row(b="value_b", 
a="value_a"))),
        (Row(nested=None), Row(nested=None)),
    ],
)
def test_pyspark_merge_with_optional_nested_struct(
    spark: SparkSession, table_name: str, row: Row, expected_row: Row
):
    df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")    
df.mergeInto(table_name, lit(True)).whenNotMatched().insertAll().merge()    
print(f"{df.collect()=}")
    # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
    print(f"{spark.table(table_name).collect()=}")    assert 
spark.table(table_name).collect()[0] == expected_row


{code}
 


> PySpark: Incorrect Handling of None for Nested Structs in merge/mergeInto 
> with Mismatched Field Order
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-55933
>                 URL: https://issues.apache.org/jira/browse/SPARK-55933
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 4.0.2
>            Reporter: Eyck Troschke
>            Priority: Major
>
> {{DeltaTable.merge}} and PySpark {{mergeInto}} incorrectly handle assignments 
> of {{None}} to nested struct columns when the field order in the incoming 
> DataFrame differs from the field order in the target table schema. Instead of 
> writing {{NULL}} for the entire struct, Delta Lake writes a struct with all 
> fields set to {{{}NULL{}}}. This issue does *not* occur when the struct 
> contains non‑null values.
> h2. *Details*
> Merging data into a Delta table fails to correctly handle {{None}} values for 
> nested struct columns when the struct field order in the source DataFrame 
> does not match the field order defined in the Delta table schema.
> Two independent test cases—one using {{DeltaTable.merge}} and one using 
> PySpark’s native {{{}mergeInto{}}}—demonstrate the same incorrect behavior:
>  * When the struct contains actual values, the merge correctly maps fields by 
> name, even if the order differs.
>  * When the struct value is {{{}None{}}}, the merge incorrectly inserts a 
> struct with all fields set to {{NULL}} instead of inserting a {{NULL}} value 
> for the entire struct column.
> This leads to inconsistent and incorrect data representation for optional 
> nested structs.
> h2. *Expected Behavior*
> When merging a row where a nested struct column is {{{}None{}}}, the Delta 
> table should store {{NULL}} for the entire struct—regardless of field order 
> differences between the DataFrame schema and the table schema.
> h2. *Actual Behavior*
> When merging a row with {{{}nested = None{}}}, the resulting table contains:
> {{Row(nested=Row(b=None, a=None))}}
> instead of:
> {{Row(nested=None)}}
> This occurs only when the DataFrame’s struct field order differs from the 
> table schema. For non‑null struct values, the merge behaves correctly.
> h2. *Reproduction*
>  
> {code:java}
> from collections.abc import Iteratorimport pytest
> from delta.tables import DeltaTable
> from pyspark.sql import Row, SparkSession
> from pyspark.sql.functions import lit
> from pyspark.sql.types import StringType, StructField, StructType
> @pytest.fixture
> def table_name(spark: SparkSession) -> Iterator[str]:
>     table_name = "test_optional_struct"
>     column = StructField(
>         "nested",
>         StructType(
>             [
>                 StructField("b", StringType()),
>                 StructField("a", StringType()),
>             ]
>         ),
>     )
>     
> DeltaTable.create(spark).tableName(table_name).addColumns([column]).execute() 
>    
>     yield table_name    
>     spark.sql(f"DROP TABLE {table_name}")
> @pytest.mark.parametrize(
>     "row, expected_row",
>     [
>         (Row(nested=Row(a="value_a", b="value_b")), 
> Row(nested=Row(b="value_b", a="value_a"))),
>         (Row(nested=None), Row(nested=None)),
>     ],
> )
> def test_delta_table_merge_with_optional_nested_struct(
>     spark: SparkSession, table_name: str, row: Row, expected_row: Row
> ):
>     # different order of fields in struct than in table schema
>     df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")  
>   
>     target_table = DeltaTable.forName(spark, table_name)    
>     target_table.merge(df, "true").whenNotMatchedInsertAll().execute()    
>     
>     print(f"{df.collect()=}")
>     # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
>     print(f"{target_table.toDF().collect()=}")    
>     
>     assert target_table.toDF().collect()[0] == expected_row
> @pytest.mark.parametrize(
>     "row, expected_row",
>     [
>         (Row(nested=Row(a="value_a", b="value_b")), 
> Row(nested=Row(b="value_b", a="value_a"))),
>         (Row(nested=None), Row(nested=None)),
>     ],
> )
> def test_pyspark_merge_with_optional_nested_struct(
>     spark: SparkSession, table_name: str, row: Row, expected_row: Row
> ):
>     df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")  
>   
>     df.mergeInto(table_name, lit(True)).whenNotMatched().insertAll().merge()  
>   
>     print(f"{df.collect()=}")
>     # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
>     print(f"{spark.table(table_name).collect()=}")    
>     
>     assert spark.table(table_name).collect()[0] == expected_row
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to