Yicong-Huang commented on code in PR #55530:
URL: https://github.com/apache/spark/pull/55530#discussion_r3270693003


##########
python/pyspark/sql/conversion.py:
##########
@@ -160,37 +176,68 @@ def enforce_schema(
         if batch.schema.equals(arrow_schema, check_metadata=False):
             return batch
 
-        # Check if columns are in the same order (by name) as the target 
schema.
-        # If so, use index-based access (faster than name lookup).
-        batch_names = [batch.schema.field(i).name for i in 
range(batch.num_columns)]
         target_names = [field.name for field in arrow_schema]
-        use_index = batch_names == target_names
 
-        coerced_arrays = []
-        for i, field in enumerate(arrow_schema):
-            try:
-                arr = batch.column(i) if use_index else 
batch.column(field.name)
-            except KeyError:
-                raise PySparkTypeError(
-                    f"Result column '{field.name}' does not exist in the 
output. "
-                    f"Expected schema: {arrow_schema}, got: {batch.schema}."
+        # Step 1: pick source columns from batch to align with target schema
+        if reorder_by_name:
+            batch_names = [batch.schema.field(i).name for i in 
range(batch.num_columns)]
+            missing = sorted(set(target_names) - set(batch_names))
+            extra = sorted(set(batch_names) - set(target_names))
+            if missing or extra:
+                raise PySparkRuntimeError(
+                    errorClass="RESULT_COLUMN_NAMES_MISMATCH",
+                    messageParameters={
+                        "missing": f" Missing: {', '.join(missing)}." if 
missing else "",
+                        "extra": f" Unexpected: {', '.join(extra)}." if extra 
else "",
+                    },
                 )
-            if arr.type != field.type:
-                if not arrow_cast:
-                    raise PySparkTypeError(
-                        f"Result type of column '{field.name}' does not match "
-                        f"the expected type. Expected: {field.type}, got: 
{arr.type}."
-                    )
+            source_columns = [batch.column(name) for name in target_names]
+            output_names = target_names
+        else:
+            # Positional: require exact column-count match, then take columns 
by
+            # index, preserving the batch's original column names.
+            if batch.num_columns != len(arrow_schema):

Review Comment:
   This has been fixed in #55978



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to