I pasted your text to chatgtp and this is what I got back

Your problem arises due to how Apache Spark serializes Python objects to be
used in Spark tasks. When a User-Defined Function (UDF) is defined, Spark
uses Python's `pickle` library to serialize the Python function and any
required objects so they can be shipped to executor processes.

However, certain Python objects, especially ones from libraries like
`numpy` or some parts of `pandas`, don't serialize well with `pickle`. When
you encounter a `PickleException`, it usually means Spark had trouble
serializing the UDF, an object it uses, or an object produced by the UDF.

The reason your plain Python version works is that there's nothing "exotic"
about the built-in Python lists, so it's easy for Spark to serialize and
distribute the UDF. On the other hand, the numpy-based version probably
fails because there's something about `np.pad` (or the numpy array objects
it uses/returns) that can't be properly pickled by Spark.

If you want to use `numpy` in your UDFs, here are a couple of things to
keep in mind:

1. **Converting Data Types**: Before returning from your UDF, convert numpy
arrays back to native Python types using `.tolist()`, which you've already
done in your code.
2. **Avoid Global Imports**: Sometimes, a UDF can be pickled more easily if
the required modules (e.g., `numpy`) are imported inside the UDF, rather
than globally. Though, this can have a performance impact because the
import statement would be executed each time the UDF is called.

To address the issue:

1. **Local Import**: Try importing numpy inside the UDF to see if that
resolves the issue:

    ```python
    @udf("array<array<float>>")
    def pad(arr: List[List[float]], n: int) -> List[List[float]]:
        import numpy as np
        return np.pad(arr, [(n, 0), (0, 0)], "constant",
constant_values=0.0).tolist()
    ```

2. **Using pandas UDFs**: If you're working with dataframes and want to
utilize `numpy` or `pandas` efficiently, consider using pandas UDFs (also
known as Vectorized UDFs) instead. They're designed to work seamlessly with
these libraries and can offer better performance in many cases.

Finally, always be aware of the version mismatches or dependencies.
Sometimes, updating or changing versions can resolve pickling issues.

Then I ask this
write an example so that i can test it in a notebook
and after some testing witch first failed

This code seems to work

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lit, size

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("Pad Array UDF Test") \
    .getOrCreate()

# Sample dataframe
data = [
    ([[1.0, 2.0], [3.0, 4.0]],),
    ([[5.0, 6.0]],)
]
df = spark.createDataFrame(data, ["array_col"])

# Compute the size of each array and the number of rows to pad
df = df.withColumn("array_size", size(col("array_col")))

expected_length = 3
df = df.withColumn("num_padding_rows", lit(expected_length) -
col("array_size"))

from typing import List

# Using plain python
@udf("array<array<float>>")
def pad_plain(arr: List[List[float]], n: int) -> List[List[float]]:
    padded_arr = []
    for i in range(n):
        padded_arr.append([0.0] * len(arr[0]))
    padded_arr.extend(arr)
    return padded_arr

# Using numpy with local import
@udf("array<array<float>>")
def pad_numpy(arr: List[List[float]], n: int) -> List[List[float]]:
    import numpy as np
    return np.pad(arr, [(n, 0), (0, 0)], "constant",
constant_values=0.0).tolist()

# Apply the UDFs
df_plain = df.withColumn("padded_plain", pad_plain(col("array_col"),
col("num_padding_rows")))
df_numpy = df.withColumn("padded_numpy", pad_numpy(col("array_col"),
col("num_padding_rows")))

df_plain.select("array_col", "padded_plain").show(truncate=False)
df_numpy.select("array_col", "padded_numpy").show(truncate=False)


+------------------------+------------------------------------+
|array_col               |padded_plain                        |
+------------------------+------------------------------------+
|[[1.0, 2.0], [3.0, 4.0]]|[[0.0, 0.0], [1.0, 2.0], [3.0, 4.0]]|
|[[5.0, 6.0]]            |[[0.0, 0.0], [0.0, 0.0], [5.0, 6.0]]|
+------------------------+------------------------------------+

+------------------------+------------------------------------+
|array_col               |padded_numpy                        |
+------------------------+------------------------------------+
|[[1.0, 2.0], [3.0, 4.0]]|[[0.0, 0.0], [1.0, 2.0], [3.0, 4.0]]|
|[[5.0, 6.0]]            |[[0.0, 0.0], [0.0, 0.0], [5.0, 6.0]]|
+------------------------+------------------------------------+


tor. 10. aug. 2023 kl. 17:26 skrev Sanket Sharma <sanketsha...@gmail.com>:

> Hi,
> I've been trying to debug a Spark UDF for a couple of days now but I can't
> seem to figure out what is going on. The UDF essentially pads a 2D array to
> a certain fixed length. When the code uses NumPy, it fails with a
> PickleException. When I re write using plain python, it works like charm.:
>
> This does not work:
>
>
> @udf("array<array<float>>")
> def pad(arr: List[List[float]], n: int) -> List[List[float]]:
>     return np.pad(arr, [(n, 0), (0, 0)], "constant",
> constant_values=0.0).tolist()
>
> But this works:
> @udf("array<array<float>>")
> def pad(arr, n):
>     padded_arr = []
>     for i in range(n):
>         padded_arr.append([0.0] * len(arr[0]))
>     padded_arr.extend(arr)
>     return padded_arr
>
> The code for calling them remains exactly the same:
> df.withColumn("test", pad(col("array_col"), expected_length -
> actual_length)
>
> What am I missing?
>
> The arrays do not have any NaNs or Nulls.
> Any thoughts or suggestions or tips for troubleshooting would be
> appreciated.
>
> Best regards,
> Sanket
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to