I pasted your text to chatgtp and this is what I got back Your problem arises due to how Apache Spark serializes Python objects to be used in Spark tasks. When a User-Defined Function (UDF) is defined, Spark uses Python's `pickle` library to serialize the Python function and any required objects so they can be shipped to executor processes.
However, certain Python objects, especially ones from libraries like `numpy` or some parts of `pandas`, don't serialize well with `pickle`. When you encounter a `PickleException`, it usually means Spark had trouble serializing the UDF, an object it uses, or an object produced by the UDF. The reason your plain Python version works is that there's nothing "exotic" about the built-in Python lists, so it's easy for Spark to serialize and distribute the UDF. On the other hand, the numpy-based version probably fails because there's something about `np.pad` (or the numpy array objects it uses/returns) that can't be properly pickled by Spark. If you want to use `numpy` in your UDFs, here are a couple of things to keep in mind: 1. **Converting Data Types**: Before returning from your UDF, convert numpy arrays back to native Python types using `.tolist()`, which you've already done in your code. 2. **Avoid Global Imports**: Sometimes, a UDF can be pickled more easily if the required modules (e.g., `numpy`) are imported inside the UDF, rather than globally. Though, this can have a performance impact because the import statement would be executed each time the UDF is called. To address the issue: 1. **Local Import**: Try importing numpy inside the UDF to see if that resolves the issue: ```python @udf("array<array<float>>") def pad(arr: List[List[float]], n: int) -> List[List[float]]: import numpy as np return np.pad(arr, [(n, 0), (0, 0)], "constant", constant_values=0.0).tolist() ``` 2. **Using pandas UDFs**: If you're working with dataframes and want to utilize `numpy` or `pandas` efficiently, consider using pandas UDFs (also known as Vectorized UDFs) instead. They're designed to work seamlessly with these libraries and can offer better performance in many cases. Finally, always be aware of the version mismatches or dependencies. Sometimes, updating or changing versions can resolve pickling issues. Then I ask this write an example so that i can test it in a notebook and after some testing witch first failed This code seems to work from pyspark.sql import SparkSession from pyspark.sql.functions import udf, col, lit, size # Initialize a SparkSession spark = SparkSession.builder \ .appName("Pad Array UDF Test") \ .getOrCreate() # Sample dataframe data = [ ([[1.0, 2.0], [3.0, 4.0]],), ([[5.0, 6.0]],) ] df = spark.createDataFrame(data, ["array_col"]) # Compute the size of each array and the number of rows to pad df = df.withColumn("array_size", size(col("array_col"))) expected_length = 3 df = df.withColumn("num_padding_rows", lit(expected_length) - col("array_size")) from typing import List # Using plain python @udf("array<array<float>>") def pad_plain(arr: List[List[float]], n: int) -> List[List[float]]: padded_arr = [] for i in range(n): padded_arr.append([0.0] * len(arr[0])) padded_arr.extend(arr) return padded_arr # Using numpy with local import @udf("array<array<float>>") def pad_numpy(arr: List[List[float]], n: int) -> List[List[float]]: import numpy as np return np.pad(arr, [(n, 0), (0, 0)], "constant", constant_values=0.0).tolist() # Apply the UDFs df_plain = df.withColumn("padded_plain", pad_plain(col("array_col"), col("num_padding_rows"))) df_numpy = df.withColumn("padded_numpy", pad_numpy(col("array_col"), col("num_padding_rows"))) df_plain.select("array_col", "padded_plain").show(truncate=False) df_numpy.select("array_col", "padded_numpy").show(truncate=False) +------------------------+------------------------------------+ |array_col |padded_plain | +------------------------+------------------------------------+ |[[1.0, 2.0], [3.0, 4.0]]|[[0.0, 0.0], [1.0, 2.0], [3.0, 4.0]]| |[[5.0, 6.0]] |[[0.0, 0.0], [0.0, 0.0], [5.0, 6.0]]| +------------------------+------------------------------------+ +------------------------+------------------------------------+ |array_col |padded_numpy | +------------------------+------------------------------------+ |[[1.0, 2.0], [3.0, 4.0]]|[[0.0, 0.0], [1.0, 2.0], [3.0, 4.0]]| |[[5.0, 6.0]] |[[0.0, 0.0], [0.0, 0.0], [5.0, 6.0]]| +------------------------+------------------------------------+ tor. 10. aug. 2023 kl. 17:26 skrev Sanket Sharma <sanketsha...@gmail.com>: > Hi, > I've been trying to debug a Spark UDF for a couple of days now but I can't > seem to figure out what is going on. The UDF essentially pads a 2D array to > a certain fixed length. When the code uses NumPy, it fails with a > PickleException. When I re write using plain python, it works like charm.: > > This does not work: > > > @udf("array<array<float>>") > def pad(arr: List[List[float]], n: int) -> List[List[float]]: > return np.pad(arr, [(n, 0), (0, 0)], "constant", > constant_values=0.0).tolist() > > But this works: > @udf("array<array<float>>") > def pad(arr, n): > padded_arr = [] > for i in range(n): > padded_arr.append([0.0] * len(arr[0])) > padded_arr.extend(arr) > return padded_arr > > The code for calling them remains exactly the same: > df.withColumn("test", pad(col("array_col"), expected_length - > actual_length) > > What am I missing? > > The arrays do not have any NaNs or Nulls. > Any thoughts or suggestions or tips for troubleshooting would be > appreciated. > > Best regards, > Sanket > > -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297