Re: SparkContext Threading

2015-06-06 Thread Lee McFadden
Hi Will,

That doesn't seem to be the case and was part of the source of my
confusion. The code currently in the run method of the runnable works
perfectly fine with the lambda expressions when it is invoked from the main
method. They also work when they are invoked from within a separate method
on the Transforms object.

It was only when putting that same code into another thread that the
serialization exception occurred.

Examples throughout the spark docs also use lambda expressions a lot -
surely those examples also would not work if this is always an issue with
lambdas?

On Sat, Jun 6, 2015, 12:21 AM Will Briggs wrbri...@gmail.com wrote:

 Hi Lee, it's actually not related to threading at all - you would still
 have the same problem even if you were using a single thread. See this
 section (
 https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark)
 of the Spark docs.


 On June 5, 2015, at 5:12 PM, Lee McFadden splee...@gmail.com wrote:


 On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote:

 Your lambda expressions on the RDDs in the SecondRollup class are closing
 around the context, and Spark has special logic to ensure that all
 variables in a closure used on an RDD are Serializable - I hate linking to
 Quora, but there's a good explanation here:
 http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


 Ah, I see!  So if I broke out the lambda expressions into a method on an
 object it would prevent this issue.  Essentially, don't use lambda
 expressions when using threads.

 Thanks again, I appreciate the help.



SparkContext Threading

2015-06-05 Thread Lee McFadden
Hi all,

I'm having some issues finding any kind of best practices when attempting
to create Spark applications which launch jobs from a thread pool.

Initially I had issues passing the SparkContext to other threads as it is
not serializable.  Eventually I found that adding the @transient annotation
prevents a NotSerializableException.

```
class SecondRollup(@transient sc: SparkContext, connector:
CassandraConnector, scanPoint: DateTime) extends Runnable with Serializable
{
...
}
```

However, now I am running into a different exception:

```
15/06/05 11:35:32 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
localhost): java.lang.NoSuchMethodError:
org.apache.spark.executor.TaskMetrics.inputMetrics_$eq(Lscala/Option;)V
at
com.datastax.spark.connector.metrics.InputMetricsUpdater$.apply(InputMetricsUpdater.scala:61)
at
com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:196)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```

The documentation (https://spark.apache.org/docs/latest/job-scheduling.html)
explicitly states that jobs can be submitted by multiple threads but I seem
to be doing *something* incorrectly and haven't found any docs to point me
in the right direction.

Does anyone have any advice on how to get jobs submitted by multiple
threads?  The jobs are fairly simple and work when I run them serially, so
I'm not exactly sure what I'm doing wrong.

Thanks,

Lee


Re: SparkContext Threading

2015-06-05 Thread Lee McFadden
You can see an example of the constructor for the class which executes a
job in my opening post.

I'm attempting to instantiate and run the class using the code below:

```
val conf = new SparkConf()
  .setAppName(appNameBase.format(Test))

val connector = CassandraConnector(conf)

val sc = new SparkContext(conf)

// Set up the threadpool for running Jobs.
val pool = Executors.newFixedThreadPool(10)

pool.execute(new SecondRollup(sc, connector, start))
```

There is some surrounding code that then waits for all the jobs entered
into the thread pool to complete, although it's not really required at the
moment as I am only submitting one job until I get this issue straightened
out :)

Thanks,

Lee

On Fri, Jun 5, 2015 at 11:50 AM Marcelo Vanzin van...@cloudera.com wrote:

 On Fri, Jun 5, 2015 at 11:48 AM, Lee McFadden splee...@gmail.com wrote:

 Initially I had issues passing the SparkContext to other threads as it is
 not serializable.  Eventually I found that adding the @transient annotation
 prevents a NotSerializableException.


 This is really puzzling. How are you passing the context around that you
 need to do serialization?

 Threads run all in the same process so serialization should not be needed
 at all.

 --
 Marcelo



Re: SparkContext Threading

2015-06-05 Thread Lee McFadden
On Fri, Jun 5, 2015 at 12:30 PM Marcelo Vanzin van...@cloudera.com wrote:

 Ignoring the serialization thing (seems like a red herring):


People seem surprised that I'm getting the Serialization exception at all -
I'm not convinced it's a red herring per se, but on to the blocking issue...




You might be using this Cassandra library with an incompatible version of
 Spark; the `TaskMetrics` class has changed in the past, and the method it's
 looking for does not exist at least in 1.4.


You are correct, I was being a bone head.  We recently downgraded to Spark
1.2.1 and I was running the compiled jar using Spark 1.3.1 on my local
machine.  Running the job with threading on my 1.2.1 cluster worked.  Thank
you for finding the obvious mistake :)

Regarding serialization, I'm still confused as to why I was getting a
serialization error in the first place as I'm executing these Runnable
classes from a java thread pool.  I'm fairly new to Scala/JVM world and
there doesn't seem to be any Spark documentation to explain *why* I need to
declare the sc variable as @transient (or even that I should).

I was under the impression that objects only need to be serializable when
they are sent over the network, and that doesn't seem to be occurring as
far as I can tell.

Apologies if this is simple stuff, but I don't like fixing things without
knowing the full reason why the changes I made fixed things :)

Thanks again for your time!


Re: SparkContext Threading

2015-06-05 Thread Lee McFadden
On Fri, Jun 5, 2015 at 1:00 PM Igor Berman igor.ber...@gmail.com wrote:

 Lee, what cluster do you use? standalone, yarn-cluster, yarn-client, mesos?


Spark standalone, v1.2.1.


Re: SparkContext Threading

2015-06-05 Thread Lee McFadden
On Fri, Jun 5, 2015 at 12:58 PM Marcelo Vanzin van...@cloudera.com wrote:

 You didn't show the error so the only thing we can do is speculate. You're
 probably sending the object that's holding the SparkContext reference over
 the network at some point (e.g. it's used by a task run in an executor),
 and that's why you were getting that exception.


Apologies - the full error is as follows.  All I did here was remove the
@transient annotation from the sc variable in my class constructor.  In
addition, the full code for the classes and launching process is included
below.

Error traceback:
```
Exception in thread pool-5-thread-1 java.lang.Error:
org.apache.spark.SparkException: Task not serializable
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at org.apache.spark.rdd.RDD.map(RDD.scala:288)
at
io.icebrg.analytics.spark.SecondRollup.run(ConnTransforms.scala:33)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
... 2 more
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 7 more
```

Code:
```
class SecondRollup(sc: SparkContext, connector: CassandraConnector,
scanPoint: DateTime) extends Runnable with Serializable {

  def run {
val conn = sc.cassandraTable(alpha_test, sensor_readings)
  .select(data)
  .where(timep = ?, scanPoint)
  .where(sensorid IN ?, System.sensors)
  .map(r = Json.parse(r.getString(data)))
  .cache()

conn.flatMap(AffectedIp.fromJson)
  .map(a = (AffectedIp.key(a), a))
  .reduceByKey(AffectedIp.reduce)
  .map(_._2)
  .map(a = AffectedIp.reduceWithCurrent(connector, a))
  .saveToCassandra(alpha_test, affected_hosts)

conn.flatMap(ServiceSummary.fromnJson)
  .map(s = (ServiceSummary.key(s), s))
  .reduceByKey(ServiceSummary.reduce)
  .map(_._2)
  .saveToCassandra(alpha_test, service_summary_rollup)

  }
}

object Transforms {
  private val appNameBase = Transforms%s
  private val dtFormatter = DateTimeFormat.forPattern(MMddHH)

  def main(args: Array[String]) {
if (args.size  2) {
  println(Usage: ConnTransforms start end
start DateTime to start processing at. Format: MMddHH
end   DateTime to end processing at.  Format: MMddHH)
  sys.exit(1)
}

// withZoneRetainFields gives us a UTC time as specified on the command
line.
val start =
dtFormatter.parseDateTime(args(0)).withZoneRetainFields(DateTimeZone.UTC)
val end =
dtFormatter.parseDateTime(args(1)).withZoneRetainFields(DateTimeZone.UTC)

println(Processing rollups from %s to %s.format(start, end))

// Create the spark context.
val conf = new SparkConf()
  .setAppName(appNameBase.format(Test))

val connector = CassandraConnector(conf)

val sc = new SparkContext(conf)

// Set up the threadpool for running Jobs.
val pool = Executors.newFixedThreadPool(10)

pool.execute(new SecondRollup(sc, connector, start))

//for (dt - new TimeRanger(start, end)) {
//  // Always run the second rollups.
//  pool.execute(new SecondRollup(sc, connector, dt))
//  if (Granularity.Minute.isGranularity(dt)) pool.execute(new
MinuteRollup(sc, connector, dt))
//}

// stop the pool from accepting new tasks
pool.shutdown()

// We've submitted all the tasks.
while 

Re: SparkContext Threading

2015-06-05 Thread Lee McFadden
On Fri, Jun 5, 2015 at 2:05 PM Will Briggs wrbri...@gmail.com wrote:

 Your lambda expressions on the RDDs in the SecondRollup class are closing
 around the context, and Spark has special logic to ensure that all
 variables in a closure used on an RDD are Serializable - I hate linking to
 Quora, but there's a good explanation here:
 http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


Ah, I see!  So if I broke out the lambda expressions into a method on an
object it would prevent this issue.  Essentially, don't use lambda
expressions when using threads.

Thanks again, I appreciate the help.


Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge

2015-05-12 Thread Lee McFadden
Thanks for explaining Sean and Cody, this makes sense now.  I'd like to
help improve this documentation so other python users don't run into the
same thing, so I'll look into that today.

On Tue, May 12, 2015 at 9:44 AM Cody Koeninger c...@koeninger.org wrote:

 One of the packages just contains the streaming-kafka code.  The other
 contains that code, plus everything it depends on.  That's what assembly
 typically means in JVM land.

 Java/Scala users are accustomed to using their own build tool to include
 necessary dependencies.  JVM dependency management is (thankfully)
 different from Python dependency management.

 As far as I can tell, there is no core issue, upstream or otherwise.






 On Tue, May 12, 2015 at 11:39 AM, Lee McFadden splee...@gmail.com wrote:

 Thanks again for all the help folks.

 I can confirm that simply switching to `--packages
 org.apache.spark:spark-streaming-kafka-assembly_2.10:1.3.1` makes
 everything work as intended.

 I'm not sure what the difference is between the two packages honestly, or
 why one should be used over the other, but the documentation is currently
 not intuitive in this matter.  If you follow the instructions, initially it
 will seem broken.  Is there any reason why the docs for Python users (or,
 in fact, all users - Java/Scala users will run into this too except they
 are armed with the ability to build their own jar with the dependencies
 included) should not be changed to using the assembly package by default?

 Additionally, after a few google searches yesterday combined with your
 help I'm wondering if the core issue is upstream in Kafka's dependency
 chain?

 On Tue, May 12, 2015 at 8:53 AM Ted Yu yuzhih...@gmail.com wrote:

 bq. it is already in the assembly

 Yes. Verified:

 $ jar tvf ~/Downloads/spark-streaming-kafka-assembly_2.10-1.3.1.jar | grep 
 yammer | grep Gauge
   1329 Sat Apr 11 04:25:50 PDT 2015 com/yammer/metrics/core/Gauge.class


 On Tue, May 12, 2015 at 8:05 AM, Sean Owen so...@cloudera.com wrote:

 It doesn't depend directly on yammer metrics; Kafka does. It wouldn't
 be correct to declare that it does; it is already in the assembly
 anyway.

 On Tue, May 12, 2015 at 3:50 PM, Ted Yu yuzhih...@gmail.com wrote:
  Currently external/kafka/pom.xml doesn't cite yammer metrics as
 dependency.
 
  $ ls -l
 
 ~/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar
  -rw-r--r--  1 tyu  staff  82123 Dec 17  2013
 
 /Users/tyu/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar
 
  Including the metrics-core jar would not increase the size of the
 final
  release artifact much.
 
  My two cents.






Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge

2015-05-12 Thread Lee McFadden
Thanks again for all the help folks.

I can confirm that simply switching to `--packages
org.apache.spark:spark-streaming-kafka-assembly_2.10:1.3.1` makes
everything work as intended.

I'm not sure what the difference is between the two packages honestly, or
why one should be used over the other, but the documentation is currently
not intuitive in this matter.  If you follow the instructions, initially it
will seem broken.  Is there any reason why the docs for Python users (or,
in fact, all users - Java/Scala users will run into this too except they
are armed with the ability to build their own jar with the dependencies
included) should not be changed to using the assembly package by default?

Additionally, after a few google searches yesterday combined with your help
I'm wondering if the core issue is upstream in Kafka's dependency chain?

On Tue, May 12, 2015 at 8:53 AM Ted Yu yuzhih...@gmail.com wrote:

 bq. it is already in the assembly

 Yes. Verified:

 $ jar tvf ~/Downloads/spark-streaming-kafka-assembly_2.10-1.3.1.jar | grep 
 yammer | grep Gauge
   1329 Sat Apr 11 04:25:50 PDT 2015 com/yammer/metrics/core/Gauge.class


 On Tue, May 12, 2015 at 8:05 AM, Sean Owen so...@cloudera.com wrote:

 It doesn't depend directly on yammer metrics; Kafka does. It wouldn't
 be correct to declare that it does; it is already in the assembly
 anyway.

 On Tue, May 12, 2015 at 3:50 PM, Ted Yu yuzhih...@gmail.com wrote:
  Currently external/kafka/pom.xml doesn't cite yammer metrics as
 dependency.
 
  $ ls -l
 
 ~/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar
  -rw-r--r--  1 tyu  staff  82123 Dec 17  2013
 
 /Users/tyu/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar
 
  Including the metrics-core jar would not increase the size of the final
  release artifact much.
 
  My two cents.





Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge

2015-05-11 Thread Lee McFadden
Thanks Ted,

The issue is that I'm using packages (see spark-submit definition) and I do
not know how to add com.yammer.metrics:metrics-core to my classpath so
Spark can see it.

Should metrics-core not be part of
the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can
work correctly?

If not, any clues as to how I can add metrics-core to my project (bearing
in mind that I'm using Python, not a JVM language) would be much
appreciated.

Thanks, and apologies for my newbness with Java/Scala.

On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote:

 com.yammer.metrics.core.Gauge is in metrics-core jar
 e.g., in master branch:
 [INFO] |  \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile
 [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile

 Please make sure metrics-core jar is on the classpath.

 On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote:

 Hi,

 We've been having some issues getting spark streaming running correctly
 using a Kafka stream, and we've been going around in circles trying to
 resolve this dependency.

 Details of our environment and the error below, if anyone can help
 resolve this it would be much appreciated.

 Submit command line:

 /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \
 --packages
 TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1
 \
 --conf
 spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \
 --master spark://127.0.0.1:7077 \
 affected_hosts.py

 When we run the streaming job everything starts just fine, then we see
 the following in the logs:

 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID
 70, ip-10-10-102-53.us-west-2.compute.internal):
 java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
 at
 kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151)
 at
 kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115)
 at
 kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128)
 at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
 at
 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
 at
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
 at
 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
 at
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298)
 at
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge
 at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 17 more






Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge

2015-05-11 Thread Lee McFadden
Hi,

We've been having some issues getting spark streaming running correctly
using a Kafka stream, and we've been going around in circles trying to
resolve this dependency.

Details of our environment and the error below, if anyone can help resolve
this it would be much appreciated.

Submit command line:

/home/ubuntu/spark/spark-1.3.1/bin/spark-submit \
--packages
TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1
\
--conf
spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \
--master spark://127.0.0.1:7077 \
affected_hosts.py

When we run the streaming job everything starts just fine, then we see the
following in the logs:

15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70,
ip-10-10-102-53.us-west-2.compute.internal):
java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
at
kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151)
at
kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115)
at
kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128)
at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 17 more


Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge

2015-05-11 Thread Lee McFadden
Ted, many thanks.  I'm not used to Java dependencies so this was a real
head-scratcher for me.

Downloading the two metrics packages from the maven repository
(metrics-core, metrics-annotation) and supplying it on the spark-submit
command line worked.

My final spark-submit for a python project using Kafka as an input source:

/home/ubuntu/spark/spark-1.3.1/bin/spark-submit \
--packages
TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1
\
--jars
/home/ubuntu/jars/metrics-core-2.2.0.jar,/home/ubuntu/jars/metrics-annotation-2.2.0.jar
\
--conf
spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \
--master spark://127.0.0.1:7077 \
affected_hosts.py

Now we're seeing data from the stream.  Thanks again!

On Mon, May 11, 2015 at 2:43 PM Sean Owen so...@cloudera.com wrote:

 Ah yes, the Kafka + streaming code isn't in the assembly, is it? you'd
 have to provide it and all its dependencies with your app. You could
 also build this into your own app jar. Tools like Maven will add in
 the transitive dependencies.

 On Mon, May 11, 2015 at 10:04 PM, Lee McFadden splee...@gmail.com wrote:
  Thanks Ted,
 
  The issue is that I'm using packages (see spark-submit definition) and I
 do
  not know how to add com.yammer.metrics:metrics-core to my classpath so
 Spark
  can see it.
 
  Should metrics-core not be part of the
  org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work
  correctly?
 
  If not, any clues as to how I can add metrics-core to my project
 (bearing in
  mind that I'm using Python, not a JVM language) would be much
 appreciated.
 
  Thanks, and apologies for my newbness with Java/Scala.
 
  On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote:
 
  com.yammer.metrics.core.Gauge is in metrics-core jar
  e.g., in master branch:
  [INFO] |  \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile
  [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile
 
  Please make sure metrics-core jar is on the classpath.
 
  On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com
 wrote:
 
  Hi,
 
  We've been having some issues getting spark streaming running correctly
  using a Kafka stream, and we've been going around in circles trying to
  resolve this dependency.
 
  Details of our environment and the error below, if anyone can help
  resolve this it would be much appreciated.
 
  Submit command line:
 
  /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \
  --packages
 
 TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1
  \
  --conf
 
 spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \
  --master spark://127.0.0.1:7077 \
  affected_hosts.py
 
  When we run the streaming job everything starts just fine, then we see
  the following in the logs:
 
  15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID
  70, ip-10-10-102-53.us-west-2.compute.internal):
  java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
  at
 
 kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151)
  at
 
 kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115)
  at
 
 kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128)
  at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
  at
 
 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
  at
 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
  at
 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
  at
 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298)
  at
 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290)
  at
 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
  at
 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
  at
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
  at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
  Caused by: java.lang.ClassNotFoundException:
  com.yammer.metrics.core.Gauge
  at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.security.AccessController.doPrivileged(Native Method

Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge

2015-05-11 Thread Lee McFadden
I opened a ticket on this (without posting here first - bad etiquette,
apologies) which was closed as 'fixed'.

https://issues.apache.org/jira/browse/SPARK-7538

I don't believe that because I have my script running means this is fixed,
I think it is still an issue.

I downloaded the spark source, ran `mvn -DskipTests clean package `, then
simply launched my python script (which shouldn't be introducing additional
*java* dependencies itself?).

Doesn't this mean these dependencies are missing from the spark build,
since I didn't modify any files within the distribution and my application
itself can't be introducing java dependency clashes?

On Mon, May 11, 2015, 4:34 PM Lee McFadden splee...@gmail.com wrote:

 Ted, many thanks.  I'm not used to Java dependencies so this was a real
 head-scratcher for me.

 Downloading the two metrics packages from the maven repository
 (metrics-core, metrics-annotation) and supplying it on the spark-submit
 command line worked.

 My final spark-submit for a python project using Kafka as an input source:

 /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \
 --packages
 TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1
 \
 --jars
 /home/ubuntu/jars/metrics-core-2.2.0.jar,/home/ubuntu/jars/metrics-annotation-2.2.0.jar
 \
 --conf
 spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \
 --master spark://127.0.0.1:7077 \
 affected_hosts.py

 Now we're seeing data from the stream.  Thanks again!

 On Mon, May 11, 2015 at 2:43 PM Sean Owen so...@cloudera.com wrote:

 Ah yes, the Kafka + streaming code isn't in the assembly, is it? you'd
 have to provide it and all its dependencies with your app. You could
 also build this into your own app jar. Tools like Maven will add in
 the transitive dependencies.

 On Mon, May 11, 2015 at 10:04 PM, Lee McFadden splee...@gmail.com
 wrote:
  Thanks Ted,
 
  The issue is that I'm using packages (see spark-submit definition) and
 I do
  not know how to add com.yammer.metrics:metrics-core to my classpath so
 Spark
  can see it.
 
  Should metrics-core not be part of the
  org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work
  correctly?
 
  If not, any clues as to how I can add metrics-core to my project
 (bearing in
  mind that I'm using Python, not a JVM language) would be much
 appreciated.
 
  Thanks, and apologies for my newbness with Java/Scala.
 
  On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote:
 
  com.yammer.metrics.core.Gauge is in metrics-core jar
  e.g., in master branch:
  [INFO] |  \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile
  [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile
 
  Please make sure metrics-core jar is on the classpath.
 
  On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com
 wrote:
 
  Hi,
 
  We've been having some issues getting spark streaming running
 correctly
  using a Kafka stream, and we've been going around in circles trying to
  resolve this dependency.
 
  Details of our environment and the error below, if anyone can help
  resolve this it would be much appreciated.
 
  Submit command line:
 
  /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \
  --packages
 
 TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1
  \
  --conf
 
 spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \
  --master spark://127.0.0.1:7077 \
  affected_hosts.py
 
  When we run the streaming job everything starts just fine, then we see
  the following in the logs:
 
  15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID
  70, ip-10-10-102-53.us-west-2.compute.internal):
  java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
  at
 
 kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151)
  at
 
 kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115)
  at
 
 kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128)
  at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
  at
 
 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
  at
 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
  at
 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
  at
 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298)
  at
 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290)
  at
 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
  at
 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498