Re: SparkContext Threading
Hi Will, That doesn't seem to be the case and was part of the source of my confusion. The code currently in the run method of the runnable works perfectly fine with the lambda expressions when it is invoked from the main method. They also work when they are invoked from within a separate method on the Transforms object. It was only when putting that same code into another thread that the serialization exception occurred. Examples throughout the spark docs also use lambda expressions a lot - surely those examples also would not work if this is always an issue with lambdas? On Sat, Jun 6, 2015, 12:21 AM Will Briggs wrbri...@gmail.com wrote: Hi Lee, it's actually not related to threading at all - you would still have the same problem even if you were using a single thread. See this section ( https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark) of the Spark docs. On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote: On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote: Your lambda expressions on the RDDs in the SecondRollup class are closing around the context, and Spark has special logic to ensure that all variables in a closure used on an RDD are Serializable - I hate linking to Quora, but there's a good explanation here: http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark Ah, I see! So if I broke out the lambda expressions into a method on an object it would prevent this issue. Essentially, don't use lambda expressions when using threads. Thanks again, I appreciate the help.
SparkContext Threading
Hi all, I'm having some issues finding any kind of best practices when attempting to create Spark applications which launch jobs from a thread pool. Initially I had issues passing the SparkContext to other threads as it is not serializable. Eventually I found that adding the @transient annotation prevents a NotSerializableException. ``` class SecondRollup(@transient sc: SparkContext, connector: CassandraConnector, scanPoint: DateTime) extends Runnable with Serializable { ... } ``` However, now I am running into a different exception: ``` 15/06/05 11:35:32 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodError: org.apache.spark.executor.TaskMetrics.inputMetrics_$eq(Lscala/Option;)V at com.datastax.spark.connector.metrics.InputMetricsUpdater$.apply(InputMetricsUpdater.scala:61) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:196) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` The documentation (https://spark.apache.org/docs/latest/job-scheduling.html) explicitly states that jobs can be submitted by multiple threads but I seem to be doing *something* incorrectly and haven't found any docs to point me in the right direction. Does anyone have any advice on how to get jobs submitted by multiple threads? The jobs are fairly simple and work when I run them serially, so I'm not exactly sure what I'm doing wrong. Thanks, Lee
Re: SparkContext Threading
You can see an example of the constructor for the class which executes a job in my opening post. I'm attempting to instantiate and run the class using the code below: ``` val conf = new SparkConf() .setAppName(appNameBase.format(Test)) val connector = CassandraConnector(conf) val sc = new SparkContext(conf) // Set up the threadpool for running Jobs. val pool = Executors.newFixedThreadPool(10) pool.execute(new SecondRollup(sc, connector, start)) ``` There is some surrounding code that then waits for all the jobs entered into the thread pool to complete, although it's not really required at the moment as I am only submitting one job until I get this issue straightened out :) Thanks, Lee On Fri, Jun 5, 2015 at 11:50 AM Marcelo Vanzin van...@cloudera.com wrote: On Fri, Jun 5, 2015 at 11:48 AM, Lee McFadden splee...@gmail.com wrote: Initially I had issues passing the SparkContext to other threads as it is not serializable. Eventually I found that adding the @transient annotation prevents a NotSerializableException. This is really puzzling. How are you passing the context around that you need to do serialization? Threads run all in the same process so serialization should not be needed at all. -- Marcelo
Re: SparkContext Threading
On Fri, Jun 5, 2015 at 12:30 PM Marcelo Vanzin van...@cloudera.com wrote: Ignoring the serialization thing (seems like a red herring): People seem surprised that I'm getting the Serialization exception at all - I'm not convinced it's a red herring per se, but on to the blocking issue... You might be using this Cassandra library with an incompatible version of Spark; the `TaskMetrics` class has changed in the past, and the method it's looking for does not exist at least in 1.4. You are correct, I was being a bone head. We recently downgraded to Spark 1.2.1 and I was running the compiled jar using Spark 1.3.1 on my local machine. Running the job with threading on my 1.2.1 cluster worked. Thank you for finding the obvious mistake :) Regarding serialization, I'm still confused as to why I was getting a serialization error in the first place as I'm executing these Runnable classes from a java thread pool. I'm fairly new to Scala/JVM world and there doesn't seem to be any Spark documentation to explain *why* I need to declare the sc variable as @transient (or even that I should). I was under the impression that objects only need to be serializable when they are sent over the network, and that doesn't seem to be occurring as far as I can tell. Apologies if this is simple stuff, but I don't like fixing things without knowing the full reason why the changes I made fixed things :) Thanks again for your time!
Re: SparkContext Threading
On Fri, Jun 5, 2015 at 1:00 PM Igor Berman igor.ber...@gmail.com wrote: Lee, what cluster do you use? standalone, yarn-cluster, yarn-client, mesos? Spark standalone, v1.2.1.
Re: SparkContext Threading
On Fri, Jun 5, 2015 at 12:58 PM Marcelo Vanzin van...@cloudera.com wrote: You didn't show the error so the only thing we can do is speculate. You're probably sending the object that's holding the SparkContext reference over the network at some point (e.g. it's used by a task run in an executor), and that's why you were getting that exception. Apologies - the full error is as follows. All I did here was remove the @transient annotation from the sc variable in my class constructor. In addition, the full code for the classes and launching process is included below. Error traceback: ``` Exception in thread pool-5-thread-1 java.lang.Error: org.apache.spark.SparkException: Task not serializable at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.map(RDD.scala:288) at io.icebrg.analytics.spark.SecondRollup.run(ConnTransforms.scala:33) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ... 2 more Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 7 more ``` Code: ``` class SecondRollup(sc: SparkContext, connector: CassandraConnector, scanPoint: DateTime) extends Runnable with Serializable { def run { val conn = sc.cassandraTable(alpha_test, sensor_readings) .select(data) .where(timep = ?, scanPoint) .where(sensorid IN ?, System.sensors) .map(r = Json.parse(r.getString(data))) .cache() conn.flatMap(AffectedIp.fromJson) .map(a = (AffectedIp.key(a), a)) .reduceByKey(AffectedIp.reduce) .map(_._2) .map(a = AffectedIp.reduceWithCurrent(connector, a)) .saveToCassandra(alpha_test, affected_hosts) conn.flatMap(ServiceSummary.fromnJson) .map(s = (ServiceSummary.key(s), s)) .reduceByKey(ServiceSummary.reduce) .map(_._2) .saveToCassandra(alpha_test, service_summary_rollup) } } object Transforms { private val appNameBase = Transforms%s private val dtFormatter = DateTimeFormat.forPattern(MMddHH) def main(args: Array[String]) { if (args.size 2) { println(Usage: ConnTransforms start end start DateTime to start processing at. Format: MMddHH end DateTime to end processing at. Format: MMddHH) sys.exit(1) } // withZoneRetainFields gives us a UTC time as specified on the command line. val start = dtFormatter.parseDateTime(args(0)).withZoneRetainFields(DateTimeZone.UTC) val end = dtFormatter.parseDateTime(args(1)).withZoneRetainFields(DateTimeZone.UTC) println(Processing rollups from %s to %s.format(start, end)) // Create the spark context. val conf = new SparkConf() .setAppName(appNameBase.format(Test)) val connector = CassandraConnector(conf) val sc = new SparkContext(conf) // Set up the threadpool for running Jobs. val pool = Executors.newFixedThreadPool(10) pool.execute(new SecondRollup(sc, connector, start)) //for (dt - new TimeRanger(start, end)) { // // Always run the second rollups. // pool.execute(new SecondRollup(sc, connector, dt)) // if (Granularity.Minute.isGranularity(dt)) pool.execute(new MinuteRollup(sc, connector, dt)) //} // stop the pool from accepting new tasks pool.shutdown() // We've submitted all the tasks. while
Re: SparkContext Threading
On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote: Your lambda expressions on the RDDs in the SecondRollup class are closing around the context, and Spark has special logic to ensure that all variables in a closure used on an RDD are Serializable - I hate linking to Quora, but there's a good explanation here: http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark Ah, I see! So if I broke out the lambda expressions into a method on an object it would prevent this issue. Essentially, don't use lambda expressions when using threads. Thanks again, I appreciate the help.
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Thanks for explaining Sean and Cody, this makes sense now. I'd like to help improve this documentation so other python users don't run into the same thing, so I'll look into that today. On Tue, May 12, 2015 at 9:44 AM Cody Koeninger c...@koeninger.org wrote: One of the packages just contains the streaming-kafka code. The other contains that code, plus everything it depends on. That's what assembly typically means in JVM land. Java/Scala users are accustomed to using their own build tool to include necessary dependencies. JVM dependency management is (thankfully) different from Python dependency management. As far as I can tell, there is no core issue, upstream or otherwise. On Tue, May 12, 2015 at 11:39 AM, Lee McFadden splee...@gmail.com wrote: Thanks again for all the help folks. I can confirm that simply switching to `--packages org.apache.spark:spark-streaming-kafka-assembly_2.10:1.3.1` makes everything work as intended. I'm not sure what the difference is between the two packages honestly, or why one should be used over the other, but the documentation is currently not intuitive in this matter. If you follow the instructions, initially it will seem broken. Is there any reason why the docs for Python users (or, in fact, all users - Java/Scala users will run into this too except they are armed with the ability to build their own jar with the dependencies included) should not be changed to using the assembly package by default? Additionally, after a few google searches yesterday combined with your help I'm wondering if the core issue is upstream in Kafka's dependency chain? On Tue, May 12, 2015 at 8:53 AM Ted Yu yuzhih...@gmail.com wrote: bq. it is already in the assembly Yes. Verified: $ jar tvf ~/Downloads/spark-streaming-kafka-assembly_2.10-1.3.1.jar | grep yammer | grep Gauge 1329 Sat Apr 11 04:25:50 PDT 2015 com/yammer/metrics/core/Gauge.class On Tue, May 12, 2015 at 8:05 AM, Sean Owen so...@cloudera.com wrote: It doesn't depend directly on yammer metrics; Kafka does. It wouldn't be correct to declare that it does; it is already in the assembly anyway. On Tue, May 12, 2015 at 3:50 PM, Ted Yu yuzhih...@gmail.com wrote: Currently external/kafka/pom.xml doesn't cite yammer metrics as dependency. $ ls -l ~/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar -rw-r--r-- 1 tyu staff 82123 Dec 17 2013 /Users/tyu/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar Including the metrics-core jar would not increase the size of the final release artifact much. My two cents.
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Thanks again for all the help folks. I can confirm that simply switching to `--packages org.apache.spark:spark-streaming-kafka-assembly_2.10:1.3.1` makes everything work as intended. I'm not sure what the difference is between the two packages honestly, or why one should be used over the other, but the documentation is currently not intuitive in this matter. If you follow the instructions, initially it will seem broken. Is there any reason why the docs for Python users (or, in fact, all users - Java/Scala users will run into this too except they are armed with the ability to build their own jar with the dependencies included) should not be changed to using the assembly package by default? Additionally, after a few google searches yesterday combined with your help I'm wondering if the core issue is upstream in Kafka's dependency chain? On Tue, May 12, 2015 at 8:53 AM Ted Yu yuzhih...@gmail.com wrote: bq. it is already in the assembly Yes. Verified: $ jar tvf ~/Downloads/spark-streaming-kafka-assembly_2.10-1.3.1.jar | grep yammer | grep Gauge 1329 Sat Apr 11 04:25:50 PDT 2015 com/yammer/metrics/core/Gauge.class On Tue, May 12, 2015 at 8:05 AM, Sean Owen so...@cloudera.com wrote: It doesn't depend directly on yammer metrics; Kafka does. It wouldn't be correct to declare that it does; it is already in the assembly anyway. On Tue, May 12, 2015 at 3:50 PM, Ted Yu yuzhih...@gmail.com wrote: Currently external/kafka/pom.xml doesn't cite yammer metrics as dependency. $ ls -l ~/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar -rw-r--r-- 1 tyu staff 82123 Dec 17 2013 /Users/tyu/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar Including the metrics-core jar would not increase the size of the final release artifact much. My two cents.
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Ted, many thanks. I'm not used to Java dependencies so this was a real head-scratcher for me. Downloading the two metrics packages from the maven repository (metrics-core, metrics-annotation) and supplying it on the spark-submit command line worked. My final spark-submit for a python project using Kafka as an input source: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --jars /home/ubuntu/jars/metrics-core-2.2.0.jar,/home/ubuntu/jars/metrics-annotation-2.2.0.jar \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py Now we're seeing data from the stream. Thanks again! On Mon, May 11, 2015 at 2:43 PM Sean Owen so...@cloudera.com wrote: Ah yes, the Kafka + streaming code isn't in the assembly, is it? you'd have to provide it and all its dependencies with your app. You could also build this into your own app jar. Tools like Maven will add in the transitive dependencies. On Mon, May 11, 2015 at 10:04 PM, Lee McFadden splee...@gmail.com wrote: Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
I opened a ticket on this (without posting here first - bad etiquette, apologies) which was closed as 'fixed'. https://issues.apache.org/jira/browse/SPARK-7538 I don't believe that because I have my script running means this is fixed, I think it is still an issue. I downloaded the spark source, ran `mvn -DskipTests clean package `, then simply launched my python script (which shouldn't be introducing additional *java* dependencies itself?). Doesn't this mean these dependencies are missing from the spark build, since I didn't modify any files within the distribution and my application itself can't be introducing java dependency clashes? On Mon, May 11, 2015, 4:34 PM Lee McFadden splee...@gmail.com wrote: Ted, many thanks. I'm not used to Java dependencies so this was a real head-scratcher for me. Downloading the two metrics packages from the maven repository (metrics-core, metrics-annotation) and supplying it on the spark-submit command line worked. My final spark-submit for a python project using Kafka as an input source: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --jars /home/ubuntu/jars/metrics-core-2.2.0.jar,/home/ubuntu/jars/metrics-annotation-2.2.0.jar \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py Now we're seeing data from the stream. Thanks again! On Mon, May 11, 2015 at 2:43 PM Sean Owen so...@cloudera.com wrote: Ah yes, the Kafka + streaming code isn't in the assembly, is it? you'd have to provide it and all its dependencies with your app. You could also build this into your own app jar. Tools like Maven will add in the transitive dependencies. On Mon, May 11, 2015 at 10:04 PM, Lee McFadden splee...@gmail.com wrote: Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498