Hi Jakob,

I had multiple versions of Spark installed in my machine. The code now works without issues in spark-shell and the IDE. I have verified this with Spark 1.6 and 2.0.

Cheers,
Kabeer.


On Mon, 3 Oct, 2016 at 7:30 PM, Jakob Odersky <ja...@odersky.com> wrote:
Hi Kabeer,

which version of Spark are you using? I can't reproduce the error in
latest Spark master.

regards,
--Jakob

On Sun, 2 Oct, 2016 at 11:39 PM, Kabeer Ahmed <kab...@linuxmail.org> wrote:
I have had a quick look at the query from Maciej. I see different behaviour while running the piece of code in spark-shell and a different one while running it as spark app.

1. While running in the spark-shell, I see the serialization error that Maciej has reported. 2. But while running the same code as SparkApp, I see a different behaviour.

I have put the code below. It would be great if someone can explain the difference in behaviour.

Thanks,
Kabeer.

------------------------------------------------
Spark-Shell:
scala> sc.stop

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark._
val sc = new SparkContext(new
      SparkConf().setAppName("bar").set("spark.serializer",
    "org.apache.spark.serializer.KryoSerializer"))

  println(sc.getConf.getOption("spark.serializer"))

  val m = Map("a" -> 1, "b" -> 2)
  val rdd5 = sc.makeRDD(Seq(m))
  println("Map RDD is: ")
def mapFunc(input: Map[String, Int]) : Unit = println(input.getOrElse("a", -2))
  rdd5.map(mapFunc).collect()

// Exiting paste mode, now interpreting.

Some(org.apache.spark.serializer.KryoSerializer)
Map RDD is:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
-------------------------------------------------------------------------------------------------


----------------------------------------------------------------------------------------
Scenario 2:

Code:

package experiment

import org.apache.spark._

object Serialization1 extends App {

  val sc = new SparkContext(new
      SparkConf().setAppName("bar").set("spark.serializer",
    "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local[1]")
  )

  println(sc.getConf.getOption("spark.serializer"))

  val m = Map("a" -> 1, "b" -> 2)
  val rdd5 = sc.makeRDD(Seq(m))
  println("Map RDD is: ")
def mapFunc(input: Map[String, Int]) : Unit = println(input.getOrElse("a", -2))
  rdd5.map(mapFunc).collect()

}

Run command:

spark-submit --class experiment.Serialization1 target/scala-2.10/learningspark_2.10-0.1-SNAPSHOT.jar

---------------------------------------------------------------------------------------------------------------




On Thu, 29 Sep, 2016 at 1:05 AM, Jakob Odersky <ja...@odersky.com> wrote:
I agree with Sean's answer, you can check out the relevant serializer
here https://github.com/twitter/chill/blob/develop/chill-scala/src/main/scala/com/twitter/chill/Traversable.scala

On Wed, Sep 28, 2016 at 3:11 AM, Sean Owen <so...@cloudera.com> wrote:
 My guess is that Kryo specially handles Maps generically or relies on
 some mechanism that does, and it happens to iterate over all
 key/values as part of that and of course there aren't actually any
 key/values in the map. The Java serialization is a much more literal
 (expensive) field-by-field serialization which works here because
 there's no special treatment. I think you could register a custom
 serializer that handles this case. Or work around it in your client
 code. I know there have been other issues with Kryo and Map because,
 for example, sometimes a Map in an application is actually some
 non-serializable wrapper view.

 On Wed, Sep 28, 2016 at 3:18 AM, Maciej Szymkiewicz
 <mszymkiew...@gmail.com> wrote:
 Hi everyone,

I suspect there is no point in submitting a JIRA to fix this (not a Spark issue?) but I would like to know if this problem is documented anywhere.
 Somehow Kryo is loosing default value during serialization:

 scala> import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.{SparkContext, SparkConf}

 scala> val aMap = Map[String, Long]().withDefaultValue(0L)
 aMap: scala.collection.immutable.Map[String,Long] = Map()

 scala> aMap("a")
 res6: Long = 0

 scala> val sc = new SparkContext(new
 SparkConf().setAppName("bar").set("spark.serializer",
 "org.apache.spark.serializer.KryoSerializer"))

 scala> sc.parallelize(Seq(aMap)).map(_("a")).first
16/09/28 09:13:47 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
 java.util.NoSuchElementException: key not found: a

 while Java serializer works just fine:

 scala> val sc = new SparkContext(new
 SparkConf().setAppName("bar").set("spark.serializer",
 "org.apache.spark.serializer.JavaSerializer"))

 scala> sc.parallelize(Seq(aMap)).map(_("a")).first
 res9: Long = 0

 --
 Best regards,
 Maciej

 ---------------------------------------------------------------------
 To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Reply via email to