I'm using Pyspark (version 3.2) and I've encountered the following exception while trying to perform a slice on array in a DataFrame: "org.apache.spark.SparkRuntimeException: Unexpected value for length in function slice: length must be greater than or equal to 0" but the length is grater then 1
Here's the full exception I'm receiving: ``` Caused by: org.apache.spark.SparkRuntimeException: Unexpected value for length in function slice: length must be greater than or equal to 0. at org.apache.spark.sql.errors.QueryExecutionErrors$.unexpectedValueForLengthInFunctionError(QueryExecutionErrors.scala:1602) at org.apache.spark.sql.errors.QueryExecutionErrors.unexpectedValueForLengthInFunctionError(QueryExecutionErrors.scala) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.Slice_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.subExpr_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:276) at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:275) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:515) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more ``` The DataFrame I'm working with has the following data: ``` 'id': '2', 'arr2': [1, 1, 1, 2], 'arr1': [0.0, 1.0, 1.0, 1.0] ``` And the schema of the DataFrame is as follows: ``` root |-- id: string (nullable = true) |-- arr2: array (nullable = true) | |-- element: long (containsNull = true) |-- arr1: array (nullable = true) | |-- element: double (containsNull = true) ``` The code is: ``` spark = SparkSession.builder.master("local").appName("test-app").getOrCreate() data = [ { 'id': '2', 'arr2': [1, 1, 1, 2], 'arr1': [0.0, 1.0, 1.0, 1.0] } ] df = spark.createDataFrame(data) df = df.withColumn('end_index', F.size('arr2') - F.lit(6)) df = df.filter(F.col('end_index') > 0) #Note HERE df = df.withColumn("trimmed_arr2",F.slice(F.col('arr2'), start=F.lit(1), length=F.col('end_index'))) df = df.withColumn("avg_trimmed", F.expr('aggregate(trimmed_arr2, 0L, (acc,x) -> acc+x, acc -> acc / end_index)')) df = df.filter(F.col('avg_trimmed') > 30) df = df.withColumn('repeated_counts', F.size(F.array_distinct('trimmed_arr2'))) df = df.withColumn('ratio', F.col('repeated_counts') / F.size('trimmed_arr2')) df = df.filter(F.col('ratio') > 0.6) df.show(truncate=False, vertical=True) ``` What's strange is that when I write the DataFrame to disk and then read it back before the following line: `df = df.filter(F.col('ratio') > 0.6)` the code executes perfectly without any exceptions. **Note: The code works if I add the following code:** spark = SparkSession.builder \ .master("local") \ .appName("test-app") \ .config("spark.driver.bindAddress", "127.0.0.1") \ .config('spark.sql.optimizer.excludedRules', "org.apache.spark.sql.catalyst.optimizer.PushDownPredicates") \ .getOrCreate() I would greatly appreciate any insights on why this SparkRuntimeException might be occurring . Any idea what else can I check? Thank you in advance for your help! Daniel Bariudin Software Developer | Cyber R&D | Cognyte Mobile: +972-050-9011701 [cid:image001.png@01D99897.7C99D400] www.cognyte.com<http://www.cognyte.com/>