[ https://issues.apache.org/jira/browse/SPARK-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334469#comment-14334469 ]
Saisai Shao commented on SPARK-5953: ------------------------------------ I think the decoder is initialized in the executor side, so your class should be found in executor side. One way is to add the jar with customized decoder class included using SparkContext#addJar. > NoSuchMethodException with a Kafka input stream and custom decoder in Scala > --------------------------------------------------------------------------- > > Key: SPARK-5953 > URL: https://issues.apache.org/jira/browse/SPARK-5953 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core > Affects Versions: 1.2.0, 1.2.1 > Environment: Xubuntu 14.04, Kafka 0.8.2, Scala 2.10.4, Scala 2.11.5 > Reporter: Aleksandar Stojadinovic > > When using a Kafka input stream, and setting a custom Kafka Decoder, Spark > throws an exception upon starting: > {noformat} > ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting > receiver 0 - java.lang.NoSuchMethodException: > UserLocationEventDecoder.<init>(kafka.utils.VerifiableProperties) > at java.lang.Class.getConstructor0(Class.java:2971) > at java.lang.Class.getConstructor(Class.java:1812) > at > org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) > at > org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) > at > org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) > at > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) > at > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/02/23 21:37:31 ERROR ReceiverSupervisorImpl: Stopped executor with error: > java.lang.NoSuchMethodException: > UserLocationEventDecoder.<init>(kafka.utils.VerifiableProperties) > 15/02/23 21:37:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) > java.lang.NoSuchMethodException: > UserLocationEventDecoder.<init>(kafka.utils.VerifiableProperties) > at java.lang.Class.getConstructor0(Class.java:2971) > at java.lang.Class.getConstructor(Class.java:1812) > at > org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) > at > org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) > at > org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) > at > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) > at > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The stream is initialized with: > {code:title=Main.scala|borderStyle=solid} > val locationsAndKeys = KafkaUtils.createStream[String, Array[Byte], > kafka.serializer.StringDecoder, UserLocationEventDecoder] (ssc, kafkaParams, > topicMap, StorageLevel.MEMORY_AND_DISK); > {code} > The decoder: > {code:title=UserLocationEventDecoder.scala|borderStyle=solid} > import kafka.serializer.Decoder > class UserLocationEventDecoder extends Decoder[UserLocationEvent] { > val kryo = new Kryo() > override def fromBytes(bytes: Array[Byte]): UserLocationEvent = { > val input: Input = new Input(new ByteArrayInputStream(bytes)) > val userLocationEvent: UserLocationEvent = > kryo.readClassAndObject(input).asInstanceOf[UserLocationEvent] > input.close() > return userLocationEvent > } > } > {code} > build.sbt: > {code:borderStyle=solid} > scalaVersion := "2.10.4" > libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.2.1" > libraryDependencies += "com.spatial4j" % "spatial4j" % "0.4.1" > libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.2.1" > libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % > "1.2.1" > libraryDependencies += "com.twitter" % "chill_2.10" % "0.5.2" > {code} > The input stream (and my code overall) works fine if initialized with the > kafka.serializer.DefaultDecoder, and content is manually deserialized. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org