In this way it works but it's not portable and the idea of having a fat jar is to avoid exactly this. Is there any system to create a self-contained portable fatJar?

On 11.12.2014 13:57, Akhil Das wrote:
Add these jars while creating the Context.

       val sc = new SparkContext(conf)

sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/*spark-streaming-kafka_2.10-1.1.0.jar*")
sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/*zkclient-0.3.jar*")
sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/*metrics-core-2.2.0.jar*")
sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/*kafka_2.10-0.8.0.jar*")
        val ssc = new StreamingContext(sc, Seconds(10))


Thanks
Best Regards

On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <mario.pastore...@teralytics.ch <mailto:mario.pastore...@teralytics.ch>> wrote:

    Hi,

    I'm trying to use spark-streaming with kafka but I get a strange
    error on class that are missing. I would like to ask if my way to
    build the fat jar is correct or no. My program is

    val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
    kafkaGroupId, kafkaTopicsWithThreads)
                                .map(_._2)

    kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
    iter:Iterator[CellWithLAC] =>
      println("time: " ++ t.toString ++ " #received: " ++
    iter.size.toString)
    })

    I use sbt to manage my project and my build.sbt (with assembly
    0.12.0 plugin) is

    name := "spark_example"

    version := "0.0.1"

    scalaVersion := "2.10.4"

    scalacOptions ++= Seq("-deprecation","-feature")

    libraryDependencies ++= Seq(
      "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
      "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
      "joda-time" % "joda-time" % "2.6"
    )

    assemblyMergeStrategy in assembly := {
      case p if p startsWith "com/esotericsoftware/minlog" =>
    MergeStrategy.first
      case p if p startsWith "org/apache/commons/beanutils" =>
    MergeStrategy.first
      case p if p startsWith "org/apache/" => MergeStrategy.last
      case "plugin.properties" => MergeStrategy.discard
      case p if p startsWith "META-INF" => MergeStrategy.discard
      case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
    }

    I create the jar with sbt assembly and the run with
    $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class
    Main target/scala-2.10/spark_example-assembly-0.0.1.jar
    localhost:2181 test-consumer-group test1

    where master:7077 is the spark master, localhost:2181 is
    zookeeper, test-consumer-group is kafka groupid and test1 is the
    kafka topic. The program starts and keep running but I get an
    error and nothing is printed. In the log I found the following
    stack trace:

    14/12/11 13:02:08 INFO network.ConnectionManager: Accepted
    connection from [10.0.3.1/10.0.3.1:54325
    <http://10.0.3.1/10.0.3.1:54325>]
    14/12/11 13:02:08 INFO network.SendingConnection: Initiating
    connection to [jpl-devvax/127.0.1.1:38767 <http://127.0.1.1:38767>]
    14/12/11 13:02:08 INFO network.SendingConnection: Connected to
    [jpl-devvax/127.0.1.1:38767 <http://127.0.1.1:38767>], 1 messages
    pending
    14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
    broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0 B,
    free: 265.4 MB)
    14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered
    receiver for stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
    14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered
    receiver for stream 0: Error starting receiver 0 -
    java.lang.NoClassDefFoundError:
    
kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
        at
    
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
    Source)
        at
    
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
    Source)
        at
    scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at
    
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
    Source)
        at
    
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
    Source)
        at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown
    Source)
        at
    kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
    Source)
        at
    
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
        at
    
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
        at
    
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
        at
    
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
        at
    
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
        at
    
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
        at
    
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
        at
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at
    
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
    
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

    I have searched inside the fat jar and I found that that class is
    not in it:

    > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  |
    grep
    
"kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
    >

    The problem is the double dollar before anonfun: if you put only
    one then the class is there:

    > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  |
    grep
    
"kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
    [...]
    kafka/consumer/ZookeeperConsumerConnector.class
    >

    I'm submitting my job to spark-1.1.1 compiled with hadoop2.4
    downloaded from the spark website.

    My question is: how can I solve this problem? I guess the problem
    is my sbt script but I don't understand why.


    Thanks,
    Mario Pastorelli



Reply via email to