Kris Mok created SPARK-40380:
--------------------------------

             Summary: Constant-folding of InvokeLike should not result in 
non-serializable result
                 Key: SPARK-40380
                 URL: https://issues.apache.org/jira/browse/SPARK-40380
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.3.0
            Reporter: Kris Mok


SPARK-37907 added constant-folding support to the {{InvokeLike}} family of 
expressions. Unfortunately it introduced a regression for cases when a 
constant-folded {{InvokeLike}} expression returned a non-serializable result. 
{{ExpressionEncoder}}s is an area where this problem may be exposed, e.g. when 
using sparksql-scalapb on Spark 3.3.0+.

Below is a minimal repro to demonstrate this issue:
{code:scala}
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{LongType, ObjectType}
class NotSerializableBoxedLong(longVal: Long) { def add(other: Long): Long = 
longVal + other }
case class SerializableBoxedLong(longVal: Long) { def toNotSerializable(): 
NotSerializableBoxedLong = new NotSerializableBoxedLong(longVal) }
val litExpr = Literal.fromObject(SerializableBoxedLong(42L), 
ObjectType(classOf[SerializableBoxedLong]))
val toNotSerializableExpr = Invoke(litExpr, "toNotSerializable", 
ObjectType(classOf[NotSerializableBoxedLong]))
val addExpr = Invoke(toNotSerializableExpr, "add", LongType, 
Seq(UnresolvedAttribute.quotedString("id")))
val df = spark.range(2).select(new Column(addExpr))
df.collect
{code}

Before SPARK-37907, this example would run fine and result in {{[[42], [43]]}}. 
But after SPARK-37907, it'd fail with:
{code:none}
...
Caused by: java.io.NotSerializableException: NotSerializableBoxedLong
Serialization stack:
        - object not serializable (class: NotSerializableBoxedLong, value: 
NotSerializableBoxedLong@71231636)
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 2)
        - element of array (index: 1)
        - array (class [Ljava.lang.Object;, size 3)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class 
org.apache.spark.sql.execution.WholeStageCodegenExec, 
functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;,
 implementation=invokeStatic 
org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
 
instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
 numCaptured=3])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class 
org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389, 
org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389@185db22c)
  at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
  at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to