Hi

My setup is to use localMode standalone, Sprak 1.0.0 release version, scala
2.10.4

I made a job that receive serialized object from Kafka broker. The objects
are serialized using kryo. 
The code :

    val sparkConf = new
SparkConf().setMaster("local[4]").setAppName("SparkTest")
      .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
      .set("spark.kryo.registrator",
"com.inneractive.fortknox.kafka.EventDetailRegistrator")

    val ssc = new StreamingContext(sparkConf, Seconds(20))
    ssc.checkpoint("checkpoint")


    val topicMap = topic.split(",").map((_,partitions)).toMap

// Create a Stream using my Decoder EventKryoEncoder 
    val events = KafkaUtils.createStream[String, EventDetails,
StringDecoder, EventKryoEncoder] (ssc, kafkaMapParams,
      topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)

    val data = events.map(e => (e.getPublisherId, 1L))
    val counter = data.reduceByKey(_ + _)
    counter.print()

    ssc.start()
    ssc.awaitTermination()

When I run this I get
java.lang.IllegalStateException: unread block data
        at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)
~[na:1.7.0_60]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
~[na:1.7.0_60]
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
~[na:1.7.0_60]
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
~[na:1.7.0_60]
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
~[na:1.7.0_60]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
~[na:1.7.0_60]
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
~[na:1.7.0_60]
        at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
~[scala-library-2.10.4.jar:na]
        at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at org.apache.spark.scheduler.Task.run(Task.scala:51)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
~[spark-core_2.10-1.0.0.jar:1.0.0]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_60]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_60]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_60]

I've check that my decoder is working I can trace that the deserialization
is OK thus sprark must get ready to use object 

My setup work if I use JSON and not Kryo serialized object
Thanks for help because I don't what to do next






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-with-Stream-Kafka-Kryo-tp9200.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to