Add these jars while creating the Context.

       val sc = new SparkContext(conf)


sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/
*spark-streaming-kafka_2.10-1.1.0.jar*")
        sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/
*zkclient-0.3.jar*")

sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/
*metrics-core-2.2.0.jar*")
        sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/
*kafka_2.10-0.8.0.jar*")

        val ssc = new StreamingContext(sc, Seconds(10))


Thanks
Best Regards

On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <
mario.pastore...@teralytics.ch> wrote:

>  Hi,
>
> I'm trying to use spark-streaming with kafka but I get a strange error on
> class that are missing. I would like to ask if my way to build the fat jar
> is correct or no. My program is
>
> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
> kafkaGroupId, kafkaTopicsWithThreads)
>                             .map(_._2)
>
> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
> iter:Iterator[CellWithLAC] =>
>   println("time: " ++ t.toString ++ " #received: " ++ iter.size.toString)
> })
>
> I use sbt to manage my project and my build.sbt (with assembly 0.12.0
> plugin) is
>
> name := "spark_example"
>
> version := "0.0.1"
>
> scalaVersion := "2.10.4"
>
> scalacOptions ++= Seq("-deprecation","-feature")
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>   "joda-time" % "joda-time" % "2.6"
> )
>
> assemblyMergeStrategy in assembly := {
>   case p if p startsWith "com/esotericsoftware/minlog" =>
> MergeStrategy.first
>   case p if p startsWith "org/apache/commons/beanutils" =>
> MergeStrategy.first
>   case p if p startsWith "org/apache/" => MergeStrategy.last
>   case "plugin.properties" => MergeStrategy.discard
>   case p if p startsWith "META-INF" => MergeStrategy.discard
>   case x =>
>     val oldStrategy = (assemblyMergeStrategy in assembly).value
>     oldStrategy(x)
> }
>
> I create the jar with sbt assembly and the run with
> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main
> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181
> test-consumer-group test1
>
> where master:7077 is the spark master, localhost:2181 is zookeeper,
> test-consumer-group is kafka groupid and test1 is the kafka topic. The
> program starts and keep running but I get an error and nothing is printed.
> In the log I found the following stack trace:
>
> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection from
> [10.0.3.1/10.0.3.1:54325]
> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection to
> [jpl-devvax/127.0.1.1:38767]
> 14/12/11 13:02:08 INFO network.SendingConnection: Connected to [jpl-devvax/
> 127.0.1.1:38767], 1 messages pending
> 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
> in memory on jpl-devvax:38767 (size: 842.0 B, free: 265.4 MB)
> 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver for
> stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
> 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver
> for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
> kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>     at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
> Source)
>     at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
> Source)
>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>     at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
> Source)
>     at
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
> Source)
>     at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
>     at
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
> Source)
>     at
> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>     at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>     at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>     at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>     at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>     at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>     at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>     at org.apache.spark.scheduler.Task.run(Task.scala:54)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
>
> I have searched inside the fat jar and I found that that class is not in
> it:
>
> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
> >
>
> The problem is the double dollar before anonfun: if you put only one then
> the class is there:
>
> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
> [...]
> kafka/consumer/ZookeeperConsumerConnector.class
> >
>
> I'm submitting my job to spark-1.1.1 compiled with hadoop2.4 downloaded
> from the spark website.
>
> My question is: how can I solve this problem? I guess the problem is my
> sbt script but I don't understand why.
>
>
> Thanks,
> Mario Pastorelli
>
>

Reply via email to