Frank Ottey created SPARK-27013:
-----------------------------------

             Summary: Add support for external encoders when resolving 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder's apply method
                 Key: SPARK-27013
                 URL: https://issues.apache.org/jira/browse/SPARK-27013
             Project: Spark
          Issue Type: Improvement
          Components: Optimizer
    Affects Versions: 2.4.0
            Reporter: Frank Ottey


I recently discovered that, because most of the common implicit encoders 
introduced by 

{noformat}import spark.implicits._{noformat}

reduce to a call to {{ExpressionEncoder}}'s {{apply}} method, it's _very_ 
difficult to generate and/or operate on {{Column}}'s whose internal types 
reduce to some Scala type that wraps an external type, even if an implicit 
encoder for that external type is available or could be trivially generated. 
See the example below:

{code:scala}
import com.example.MyBean

object Example {
    implicit def BeanEncoder: Encoder[MyBean] = Encoders.bean(classOf[MyBean])
    
    def main(args: Array[String]): Unit = {

        val path = args(0)

        val spark: SparkSession = ???

        import spark.implicits._

        // THE FOLLOWING DOES NOT WORK!!!
        // implicit encoder for Seq[_] is found and used...
        // Calls ExpressionEncoder's apply method
        // Unwraps the inner type com.example.MyBean...
        // ScalaReflection.serialzeFor() cannot find encoder for our type
        // Even though we can trivially create one above!!!!

        // Fails at runtime with UnsupportedOperationException from 
        // ScalaReflection.serialzeFor()
        val ds = spark.read
                      .format("avro")
                      .option("compression", "snappy")
                      .load(path)
                      .select($"myColumn".as[Seq[MyBean]])

}
{code}

What's particularly frustrating is that if we were using any user-defined case 
class instead of the java bean type, this is not a problem, as the structuring 
of the various implicit encoders in the related packages seems to allow the 
{{ScalaReflection.serializeFor()}} method to work on arbitrary 
{{scala.Product}} types... (There's an implicit encoder in 
org.apache.spark.sql.Encoders that looks relevant)

I realize that there are workarounds, such as wrapping the types and then using 
a simple {{.map()}}, or using kryo or java serialization, but my understanding 
is that would mean giving up on potential Catalyst optimizations...

It would be really nice if there were a simple way to tell 
{{ScalaReflection.serializeFor()}} to look for/use other, potentially 
user-defined encoders, especially if they could be generated from the factory 
encoder methods supplied by Spark itself...

Alternatively, It would be exceptionally nice if calls to 
{{ExpressionEncoder}}'s {{apply}} method would support expressions with types 
that include {{java.util.List}} or arbitrary java bean types as well as 
{{scala.Product}} types.

See 
[here|https://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-dataset]
 on Stackoverflow for other details...




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to