dtenedor commented on code in PR #43356:
URL: https://github.com/apache/spark/pull/43356#discussion_r1366193870


##########
python/pyspark/worker.py:
##########
@@ -841,6 +845,63 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             "the query again."
         )
 
+    # Compares each UDTF output row against the output schema for this 
particular UDTF call,
+    # raising an error if the two are incompatible.
+    def check_output_row_against_schema(row: Any) -> None:
+        nonlocal return_type
+        for result_column_index in range(len(return_type)):
+
+            def check_for_none_in_non_nullable_column(
+                value: Any, data_type: DataType, nullable: bool
+            ) -> None:
+                if value is None and not nullable:
+                    raise PySparkRuntimeError(
+                        error_class="UDTF_EXEC_ERROR",
+                        message_parameters={
+                            "method_name": "eval' or 'terminate",
+                            "error": f"Column {result_column_index} within a 
returned row had a "
+                            + "value of None, either directly or within 
array/struct/map "
+                            + "subfields, but the corresponding column type 
was declared as "
+                            + "non-nullable; please update the UDTF to return 
a non-None value at "
+                            + "this location or otherwise declare the column 
type as nullable.",
+                        },
+                    )
+                elif (
+                    isinstance(data_type, ArrayType)
+                    and isinstance(value, list)
+                    and not data_type.containsNull
+                ):
+                    for sub_value in value:
+                        check_for_none_in_non_nullable_column(
+                            sub_value, data_type.elementType, 
data_type.containsNull
+                        )
+                elif isinstance(data_type, StructType) and isinstance(value, 
Row):
+                    for field_name, field_value in value.asDict().items():

Review Comment:
   Good point; I switched this to iterate through the row using column indexes 
instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to