Hi,

I'm using PySpark to construct a DataFrame and run it. This DataFrame uses
views created by other DataFrames in the same session. One of these views,
say view A, has an SQL statement that has in one of the SQL expressions a
combination of map functions, among them Map Concat. Another view, say view
B, is an SQL statement on view A with a GROUP BY on several columns.

When I execute this data frame (show with Truncate=False), I receive an
exception informing me MapConcat tried to create a map but saw a duplicate
key, and since the dedup policy is EXCEPTION, it failed with the following:

java.lang.RuntimeException: Duplicate map key tag::eks_cluster_name was
found, please check the input data. If you want to remove the duplicated
keys, you can set spark.sql.mapKeyDedupPolicy to LAST_WIN so that the key
inserted at last takes precedence.
at
org.apache.spark.sql.errors.QueryExecutionErrors$.duplicateMapKeyFoundError(QueryExecutionErrors.scala:1068)
at
org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.put(ArrayBasedMapBuilder.scala:69)
at
org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.put(ArrayBasedMapBuilder.scala:84)
at
org.apache.spark.sql.catalyst.expressions.MapFromEntries.nullSafeEval(collectionOperations.scala:820)
at
org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:512)
at
org.apache.spark.sql.catalyst.expressions.MapConcat.$anonfun$eval$5(collectionOperations.scala:689)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)


When creating the SparkSession, I configured spark.sql.mapKeyDedupPolicy to
LAST_WIN.

The weird part: if I remove the GROUP BY in view B, the problem disappears.
Note that the map building is happening in view A. View B is an SQL
statement on view A.

I managed to recreate using the same code in a unit test - a simple Python
script which creates a Spark Session in Client mode, hence it's running
behind the scenes a JVM with SparkSubmit, including the driver and the
executor on the JVM.

I'm using Spark v3.3.2, so I cloned the repo, opened IntelliJ, and attached
a debugger to it.

When the GROUP BY exists, and I breakpoint at ArrayBasedMapBuilder
construction, I see that TaskContext configuration (which SQLConf is based
on) seems only one property - meaning that dedup policy falls back to
EXCEPTION hence my failure.
When I remove the GROUP BY, the same breakpoint, I see TaskContext
configuration contains 35 entries, among them my LAST_WIN configuration.

So my question:
What can cause the driver to executes a task *without* the configuration
that was configured on the SparkSession?

Thanks!

Asaf

Reply via email to