I have had a quick look at the query from Maciej. I see different behaviour while running the piece of code in spark-shell and a different one while running it as spark app.

1. While running in the spark-shell, I see the serialization error that Maciej has reported. 2. But while running the same code as SparkApp, I see a different behaviour.

I have put the code below. It would be great if someone can explain the difference in behaviour.

Thanks,
Kabeer.

------------------------------------------------
Spark-Shell:
scala> sc.stop

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark._
val sc = new SparkContext(new
     SparkConf().setAppName("bar").set("spark.serializer",
   "org.apache.spark.serializer.KryoSerializer"))

 println(sc.getConf.getOption("spark.serializer"))

 val m = Map("a" -> 1, "b" -> 2)
 val rdd5 = sc.makeRDD(Seq(m))
 println("Map RDD is: ")
def mapFunc(input: Map[String, Int]) : Unit = println(input.getOrElse("a", -2))
 rdd5.map(mapFunc).collect()

// Exiting paste mode, now interpreting.

Some(org.apache.spark.serializer.KryoSerializer)
Map RDD is:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
-------------------------------------------------------------------------------------------------


----------------------------------------------------------------------------------------
Scenario 2:

Code:

package experiment

import org.apache.spark._

object Serialization1 extends App {

 val sc = new SparkContext(new
     SparkConf().setAppName("bar").set("spark.serializer",
   "org.apache.spark.serializer.KryoSerializer")
     .setMaster("local[1]")
 )

 println(sc.getConf.getOption("spark.serializer"))

 val m = Map("a" -> 1, "b" -> 2)
 val rdd5 = sc.makeRDD(Seq(m))
 println("Map RDD is: ")
def mapFunc(input: Map[String, Int]) : Unit = println(input.getOrElse("a", -2))
 rdd5.map(mapFunc).collect()

}

Run command:

spark-submit --class experiment.Serialization1 target/scala-2.10/learningspark_2.10-0.1-SNAPSHOT.jar

---------------------------------------------------------------------------------------------------------------




On Thu, 29 Sep, 2016 at 1:05 AM, Jakob Odersky <ja...@odersky.com> wrote:
I agree with Sean's answer, you can check out the relevant serializer
here https://github.com/twitter/chill/blob/develop/chill-scala/src/main/scala/com/twitter/chill/Traversable.scala

On Wed, Sep 28, 2016 at 3:11 AM, Sean Owen <so...@cloudera.com> wrote:
My guess is that Kryo specially handles Maps generically or relies on
some mechanism that does, and it happens to iterate over all
key/values as part of that and of course there aren't actually any
key/values in the map. The Java serialization is a much more literal
(expensive) field-by-field serialization which works here because
there's no special treatment. I think you could register a custom
serializer that handles this case. Or work around it in your client
code. I know there have been other issues with Kryo and Map because,
for example, sometimes a Map in an application is actually some
non-serializable wrapper view.

On Wed, Sep 28, 2016 at 3:18 AM, Maciej Szymkiewicz
<mszymkiew...@gmail.com> wrote:
Hi everyone,

I suspect there is no point in submitting a JIRA to fix this (not a Spark issue?) but I would like to know if this problem is documented anywhere.
Somehow Kryo is loosing default value during serialization:

scala> import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkContext, SparkConf}

scala> val aMap = Map[String, Long]().withDefaultValue(0L)
aMap: scala.collection.immutable.Map[String,Long] = Map()

scala> aMap("a")
res6: Long = 0

scala> val sc = new SparkContext(new
SparkConf().setAppName("bar").set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer"))

scala> sc.parallelize(Seq(aMap)).map(_("a")).first
16/09/28 09:13:47 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
java.util.NoSuchElementException: key not found: a

while Java serializer works just fine:

scala> val sc = new SparkContext(new
SparkConf().setAppName("bar").set("spark.serializer",
"org.apache.spark.serializer.JavaSerializer"))

scala> sc.parallelize(Seq(aMap)).map(_("a")).first
res9: Long = 0

--
Best regards,
Maciej

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to