I'm using Pyspark (version 3.2) and I've encountered the following exception 
while trying to perform a slice on array in a DataFrame:
"org.apache.spark.SparkRuntimeException: Unexpected value for length in 
function slice: length must be greater than or equal to 0" but the length is 
grater then 1

Here's the full exception I'm receiving:

```
Caused by: org.apache.spark.SparkRuntimeException: Unexpected value for length 
in function slice: length must be greater than or equal to 0.
                at 
org.apache.spark.sql.errors.QueryExecutionErrors$.unexpectedValueForLengthInFunctionError(QueryExecutionErrors.scala:1602)
                at 
org.apache.spark.sql.errors.QueryExecutionErrors.unexpectedValueForLengthInFunctionError(QueryExecutionErrors.scala)
                at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.Slice_0$(Unknown
 Source)
                at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.subExpr_1$(Unknown
 Source)
                at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
                at 
org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:276)
                at 
org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:275)
                at 
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:515)
                at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
 Source)
                at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
                at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
                at 
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
                at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
                at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
                at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
                at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
                at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
                at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
                at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
                at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
                at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
                at org.apache.spark.scheduler.Task.run(Task.scala:139)
                at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
                at 
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
                at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                ... 1 more
```

The DataFrame I'm working with has the following data:

```
        'id': '2',
        'arr2': [1, 1, 1, 2],
        'arr1': [0.0, 1.0, 1.0, 1.0]

```

And the schema of the DataFrame is as follows:

```
root
|-- id: string (nullable = true)
|-- arr2: array (nullable = true)
|    |-- element: long (containsNull = true)
|-- arr1: array (nullable = true)
|    |-- element: double (containsNull = true)
```

The code is:

```
    spark = 
SparkSession.builder.master("local").appName("test-app").getOrCreate()
    data = [
        {
            'id': '2',
            'arr2': [1, 1, 1, 2],
            'arr1': [0.0, 1.0, 1.0, 1.0]
        }
    ]
    df = spark.createDataFrame(data)


    df = df.withColumn('end_index', F.size('arr2') - F.lit(6))
    df = df.filter(F.col('end_index') > 0) #Note HERE
    df = df.withColumn("trimmed_arr2",F.slice(F.col('arr2'), start=F.lit(1), 
length=F.col('end_index')))
    df = df.withColumn("avg_trimmed", F.expr('aggregate(trimmed_arr2, 0L, 
(acc,x) -> acc+x, acc -> acc / end_index)'))
    df = df.filter(F.col('avg_trimmed') > 30)
    df = df.withColumn('repeated_counts', 
F.size(F.array_distinct('trimmed_arr2')))
    df = df.withColumn('ratio', F.col('repeated_counts') / 
F.size('trimmed_arr2'))
    df = df.filter(F.col('ratio') > 0.6)
    df.show(truncate=False, vertical=True)
```

What's strange is that when I write the DataFrame to disk and then read it back 
before the following line:  `df = df.filter(F.col('ratio') > 0.6)` the code 
executes perfectly without any exceptions.


**Note:
The code works if I add the following code:**



    spark = SparkSession.builder \
    .master("local") \
    .appName("test-app") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config('spark.sql.optimizer.excludedRules', 
"org.apache.spark.sql.catalyst.optimizer.PushDownPredicates") \
    .getOrCreate()




I would greatly appreciate any insights on why this SparkRuntimeException might 
be occurring .
Any idea what else can I check?

Thank you in advance for your help!

Daniel Bariudin
Software Developer  | Cyber R&D | Cognyte
Mobile: +972-050-9011701
[cid:image001.png@01D99897.7C99D400]
www.cognyte.com<http://www.cognyte.com/>

Reply via email to