Hi,

I'm trying to run a kafka-stream and get a strange exception. The

streaming is created by following code:


    val lines = KafkaUtils.createStream[String, VtrRecord,

StringDecoder, VtrRecordDeserializer](ssc, kafkaParams.toMap,

topicpMap, StorageLevel.MEMORY_AND_DISK_SER_2)


'VtrRecord' is generated by protobuf in the same package,

'VtrRecordDeserializer' is a Decoder to transfom byte[] to 'VtrRecord'

as following:


import com.aries.hawkeyes.VtrRecordProtos.VtrRecord

class VtrRecordDeserializer(props: VerifiableProperties = null)

extends kafka.serializer.Decoder[VtrRecord] {

override def fromBytes(bytes : Array[Byte]) : VtrRecord = {

VtrRecord.parseFrom(bytes)

}

}


When the assembly jar(build by maven-shade-plugin)  is submitted to

the Spark cluster, I get the following ClassNotFoundException

exception:


java.lang.RuntimeException: Unable to find proto buffer class

        at 
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:616)

        at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1075)

        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1779)

        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)

        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1963)

        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1887)

        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)

        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)

        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:368)

        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)

        at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:104)

        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

        at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57)

        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)

        at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)

        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)

        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)

        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)

        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)

        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)

        at org.apache.spark.scheduler.Task.run(Task.scala:53)

        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)

        at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

        at java.lang.Thread.run(Thread.java:679)

Caused by: java.lang.ClassNotFoundException:

com.aries.hawkeyes.VtrRecordProtos$VtrRecord

        at java.net.URLClassLoader$1.run(URLClassLoader.java:217)

        at java.security.AccessController.doPrivileged(Native Method)

        at java.net.URLClassLoader.findClass(URLClassLoader.java:205)

        at java.lang.ClassLoader.loadClass(ClassLoader.java:321)

        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)

        at java.lang.ClassLoader.loadClass(ClassLoader.java:266)

        at java.lang.Class.forName0(Native Method)

        at java.lang.Class.forName(Class.java:186)

        at 
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768)

        ... 34 more


I have checked the assembly jar on the workers with `jar -tf`,

'com.aries.hawkeyes.VtrRecordProtos$VtrRecord' is definitely there.

Also, to test whether the executor can load this class,  I have tried

'System.out.println(Class.forName("com.aries.hawkeyes.VtrRecordProtos$VtrRecord"))'

in my application and

'Thread.currentThread.getContextClassLoader.loadClass("com.aries.hawkeyes.VtrRecordProtos$VtrRecord")'

in org.apache.spark.executor.Executor.run(), both work fine without

any exception. Is this due to Kafka pasers the record in another

thread whose ClassLoader does not load the task jar? Is there some way

to make it work?


Btw the spark cluster build in Spark 0.9.0 runs in standalone mode.


Thanks.


Aries.K

Reply via email to