Hi ,
I meeting some issue when I try to read from some string coming from web
service as an UDF string and register.

Here is the exception.

java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type
scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)

at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)

at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)

at org.apache.spark.scheduler.Task.run(Task.scala:108)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:748)

Here is the some simple code.

package app

import org.apache.spark.sql.SparkSession
import util.{ScalaSourceCodeCompiler}

/**
  */
object Test {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("Test")
        .master("local")
      .getOrCreate()
    
spark.sparkContext.getConf.setJars(Seq("/Users/dubin/code/java/BatchPro/BatchProSpark/target/BatchProSpark-1.0-SNAPSHOT.jar"))

    val logDataDF = spark.read.format("csv").option("header",
"true").load("/Users/dubin/code/java/BatchPro/BatchProSpark/src/test/resources/cars.csv")
    logDataDF.show()
    logDataDF.registerTempTable("car")

    val code =
      """
        |(a: String) => { a.toUpperCase }
      """.stripMargin


    val userUDF = ScalaSourceCodeCompiler.compileCodeTest(code)
    spark.udf.register("toU", userUDF)

    val res = spark.sql("select toU(t) from car")
    val r = res.toJSON.collect()

    res.printSchema()
    res.show()
    spark.stop()

    r.foreach(println(_))
  }

}

and

def compileCodeTest(code: String): String => String = {
  Eval[String=> String](code)
}

I already try some ways, I suspect the exception is related to the Eval run
time, But I am not sure. Can anyone help?

Thanks,
Du Bin

Reply via email to