Eyck Troschke created SPARK-55933:
-------------------------------------

             Summary: PySpark: Incorrect Handling of None for Nested Structs in 
merge/mergeInto with Mismatched Field Order
                 Key: SPARK-55933
                 URL: https://issues.apache.org/jira/browse/SPARK-55933
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 4.0.2
            Reporter: Eyck Troschke


{{DeltaTable.merge}} and PySpark {{mergeInto}} incorrectly handle assignments 
of {{None}} to nested struct columns when the field order in the incoming 
DataFrame differs from the field order in the target table schema. Instead of 
writing {{NULL}} for the entire struct, Delta Lake writes a struct with all 
fields set to {{{}NULL{}}}. This issue does *not* occur when the struct 
contains non‑null values.
h2. *Details*

Merging data into a Delta table fails to correctly handle {{None}} values for 
nested struct columns when the struct field order in the source DataFrame does 
not match the field order defined in the Delta table schema.

Two independent test cases—one using {{DeltaTable.merge}} and one using 
PySpark’s native {{{}mergeInto{}}}—demonstrate the same incorrect behavior:
 * When the struct contains actual values, the merge correctly maps fields by 
name, even if the order differs.
 * When the struct value is {{{}None{}}}, the merge incorrectly inserts a 
struct with all fields set to {{NULL}} instead of inserting a {{NULL}} value 
for the entire struct column.

This leads to inconsistent and incorrect data representation for optional 
nested structs.
h2. *Expected Behavior*

When merging a row where a nested struct column is {{{}None{}}}, the Delta 
table should store {{NULL}} for the entire struct—regardless of field order 
differences between the DataFrame schema and the table schema.
h2. *Actual Behavior*

When merging a row with {{{}nested = None{}}}, the resulting table contains:

{{Row(nested=Row(b=None, a=None))}}

instead of:

{{Row(nested=None)}}

This occurs only when the DataFrame’s struct field order differs from the table 
schema. For non‑null struct values, the merge behaves correctly.
h2. *Reproduction*

 
{code:java}
from collections.abc import Iteratorimport pytest
from delta.tables import DeltaTable
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType, StructField, StructType
@pytest.fixture
def table_name(spark: SparkSession) -> Iterator[str]:
    table_name = "test_optional_struct"
    column = StructField(
        "nested",
        StructType(
            [
                StructField("b", StringType()),
                StructField("a", StringType()),
            ]
        ),
    )
    
DeltaTable.create(spark).tableName(table_name).addColumns([column]).execute()   
 yield table_name    spark.sql(f"DROP TABLE {table_name}")
@pytest.mark.parametrize(
    "row, expected_row",
    [
        (Row(nested=Row(a="value_a", b="value_b")), Row(nested=Row(b="value_b", 
a="value_a"))),
        (Row(nested=None), Row(nested=None)),
    ],
)
def test_delta_table_merge_with_optional_nested_struct(
    spark: SparkSession, table_name: str, row: Row, expected_row: Row
):
    # different order of fields in struct than in table schema
    df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")    
target_table = DeltaTable.forName(spark, table_name)    target_table.merge(df, 
"true").whenNotMatchedInsertAll().execute()    print(f"{df.collect()=}")
    # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
    print(f"{target_table.toDF().collect()=}")    assert 
target_table.toDF().collect()[0] == expected_row
@pytest.mark.parametrize(
    "row, expected_row",
    [
        (Row(nested=Row(a="value_a", b="value_b")), Row(nested=Row(b="value_b", 
a="value_a"))),
        (Row(nested=None), Row(nested=None)),
    ],
)
def test_pyspark_merge_with_optional_nested_struct(
    spark: SparkSession, table_name: str, row: Row, expected_row: Row
):
    df = spark.createDataFrame([row], "nested STRUCT<a: STRING, b: STRING>")    
df.mergeInto(table_name, lit(True)).whenNotMatched().insertAll().merge()    
print(f"{df.collect()=}")
    # returns [Row(nested=Row(b=None, a=None))] instead of [Row(nested=None)]:
    print(f"{spark.table(table_name).collect()=}")    assert 
spark.table(table_name).collect()[0] == expected_row


{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to