Hi,  I want to include if possible Kryo serialization in a project and
first I'm trying to run FlumeEventCount with Kryo. If I comment  setAll
method, runs correctly, but if I use Kryo params it returns several errors.

15/02/11 11:42:16 ERROR SparkDeploySchedulerBackend: Asked to remove
non-existent executor 1
15/02/11 11:42:16 ERROR JobScheduler: Error running job streaming job
1423651330000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage
2.0 (TID 8, localhost): ExecutorLostFailure (executor 1 lost)


This is my code.

object flumeKryo {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf(true)
      .setMaster("spark://localhost:7077")
      .setAppName("TestKryo")










*.setAll(        Map(          "spark.serializer" ->
"com.gmunoz.flumekryo.WrapperSerializer",
"spark.kryo.registrator" -> "com.gmunoz.flumekryo.MyRegistrator",
    "spark.task.maxFailures" -> "1",          "spark.rdd.compress" ->
"true",          "spark.storage.memoryFraction" -> "1",
"spark.core.connection.ack.wait.timeout" -> "600",
"spark.akka.frameSize" -> "50"        )      )*

    val sc = new SparkContext(sparkConf)
    
sc.addJar("/home/gmunoz/workspace/flumekryo/target/flumekryo-1.0-SNAPSHOT.jar")

    val ssc = new StreamingContext(sc, Seconds(2))

    // Create a flume stream
    val stream = FlumeUtils.createPollingStream(ssc, "localhost",
11000, StorageLevel.MEMORY_ONLY_SER_2)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events.").print()

    ssc.start()
    ssc.awaitTermination()
  }
}

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    Console.err.println("################# MyRegistrator called")
    kryo.register(classOf[SparkFlumeEvent])
  }
}

class WrapperSerializer(conf: SparkConf) extends KryoSerializer(conf) {
  override def newKryo(): Kryo = {
    println("## Called newKryo!")
    super.newKryo()
  }


What am I doing wrong?

Thanks!

Reply via email to