I'm very close! So I added that and then I added this: http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta
and it seems as though the stream is working as it says Stream 0 received 1 or 2 blocks as I enter in messages on my kafka producer. However, the Receiver seems to keep trying every 2 seconds (as I've included 2000 in my duration in my java app). How can I stop the Receiver from consuming messages after 10 seconds and output the word count to the console? Thanks a lot for all the help! I'm excited to see this word count :) Suhas Shekar University of California, Los Angeles B.A. Economics, Specialization in Computing 2014 On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Add this jar in the dependency > http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0 > > Thanks > Best Regards > > On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <suhsheka...@gmail.com> > wrote: > >> Hello Akhil, >> >> I chanced my Kafka dependency to 2.10 (which is the version of kafka that >> was on 10.0.1.232). I am getting a slightly different error, but at the >> same place as the previous error (pasted below). >> >> FYI, when I make these changes to the pom file, I do "mvn clean package" >> then cp the new jar files from the repository to my lib of jar files which >> is a argument in my spark-submit script which is in my original post. >> >> Thanks again for the time and help...much appreciated. >> >> >> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver >> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream with >> group: c1 >> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper: >> 10.0.1.232:2181 >> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread >> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties >> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is >> overridden to c1 >> 14/12/29 07:56:00 INFO VerifiableProperties: Property zookeeper.connect >> is overridden to 10.0.1.232:2181 >> 14/12/29 07:56:00 INFO VerifiableProperties: Property >> zookeeper.connection.timeout.ms is overridden to 10000 >> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with >> message: Error starting receiver 0: java.lang.NoClassDefFoundError: >> com/yammer/metrics/Metrics >> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop >> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver 0 >> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for stream >> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: >> com/yammer/metrics/Metrics >> at >> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51) >> at >> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83) >> at >> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107) >> at >> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142) >> at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) >> at >> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97) >> at >> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) >> at >> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) >> at >> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) >> at >> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) >> at >> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >> at >> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >> at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >> at org.apache.spark.scheduler.Task.run(Task.scala:54) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >> at java.lang.Thread.run(Thread.java:722) >> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >> at java.security.AccessController.doPrivileged(Native Method) >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:423) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:356) >> ... 18 more >> >> >> Suhas Shekar >> >> University of California, Los Angeles >> B.A. Economics, Specialization in Computing 2014 >> >> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <suhsheka...@gmail.com> >> wrote: >> >>> I made both versions 1.1.1 and I got the same error. I then tried making >>> both 1.1.0 as that is the version of my Spark Core, but I got the same >>> error. >>> >>> I noticed my Kafka dependency is for scala 2.9.2, while my spark >>> streaming kafka dependency is 2.10.x...I will try changing that next, but >>> don't think that will solve the error as I dont think the application had >>> got to level yet. >>> >>> Please let me know of any possible next steps. >>> >>> Thank you again for the time and the help! >>> >>> >>> >>> Suhas Shekar >>> >>> University of California, Los Angeles >>> B.A. Economics, Specialization in Computing 2014 >>> >>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>> >>>> Just looked at the pom file that you are using, why are you having >>>> different versions in it? >>>> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-streaming-kafka_2.10</artifactId> >>>> <version>*1.1.1*</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.spark</groupId> >>>> <artifactId>spark-streaming_2.10</artifactId> >>>> <version>*1.0.2*</version> >>>> </dependency> >>>> >>>> can you make both the versions the same? >>>> >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <suhsheka...@gmail.com> >>>> wrote: >>>> >>>>> 1) Could you please clarify on what you mean by checking the Scala >>>>> version is correct? In my pom.xml file it is 2.10.4 (which is the same as >>>>> when I start spark-shell). >>>>> >>>>> 2) The spark master URL is definitely correct as I have run other apps >>>>> with the same script that use Spark (like a word count with a local file) >>>>> >>>>> Thank you for the help! >>>>> >>>>> >>>>> >>>>> >>>>> Suhas Shekar >>>>> >>>>> University of California, Los Angeles >>>>> B.A. Economics, Specialization in Computing 2014 >>>>> >>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das < >>>>> ak...@sigmoidanalytics.com> wrote: >>>>> >>>>>> Make sure you verify the following: >>>>>> >>>>>> - Scala version : I think the correct version would be 2.10.x >>>>>> - SparkMasterURL: Be sure that you copied the one displayed on the >>>>>> webui's top left corner (running on port 8080) >>>>>> >>>>>> Thanks >>>>>> Best Regards >>>>>> >>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <suhsheka...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello Everyone, >>>>>>> >>>>>>> Thank you for the time and the help :). >>>>>>> >>>>>>> My goal here is to get this program working: >>>>>>> >>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java >>>>>>> >>>>>>> The only lines I do not have from the example are lines 62-67. >>>>>>> pom.xml >>>>>>> < >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml >>>>>>> > >>>>>>> >>>>>>> Background: Have ec2 instances running. The standalone spark is >>>>>>> running on >>>>>>> top of Cloudera Manager 5.2. >>>>>>> >>>>>>> Pom file is attached and the same for both clusters. >>>>>>> pom.xml >>>>>>> < >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml >>>>>>> > >>>>>>> >>>>>>> Here are a few different approaches I have taken and the issues I >>>>>>> run into: >>>>>>> >>>>>>> *Standalone Mode* >>>>>>> >>>>>>> 1) Use spark-submit script to run: >>>>>>> >>>>>>> >>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit >>>>>>> --class SimpleApp --master spark://10.0.1.230:7077 --jars $(echo >>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',') >>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar >>>>>>> >>>>>>> Interesting...I was getting an error like this: Initial job has not >>>>>>> accepted >>>>>>> any resources; check your cluster UI >>>>>>> >>>>>>> Now, when I run, it prints out the 3 Hello world statements in my >>>>>>> code: >>>>>>> KafkaJavaConsumer.txt >>>>>>> < >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt >>>>>>> > >>>>>>> >>>>>>> and then it seems to try to start the Kafka Stream, but fails: >>>>>>> >>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer Stream >>>>>>> with >>>>>>> group: c1 >>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for >>>>>>> stream 0 >>>>>>> from akka://sparkDriver >>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper: >>>>>>> 10.0.1.232:2181 >>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread >>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver with >>>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError: >>>>>>> scala/reflect/ClassManifest >>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver onStop >>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering >>>>>>> receiver 0 >>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver for >>>>>>> stream >>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: >>>>>>> scala/reflect/ClassManifest >>>>>>> at >>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29) >>>>>>> at >>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala) >>>>>>> at kafka.utils.Logging$class.$init$(Logging.scala:29) >>>>>>> at >>>>>>> >>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26) >>>>>>> at >>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >>>>>>> at >>>>>>> >>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >>>>>>> at >>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:54) >>>>>>> at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180) >>>>>>> at >>>>>>> >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >>>>>>> at >>>>>>> >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >>>>>>> at java.lang.Thread.run(Thread.java:722) >>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>> scala.reflect.ClassManifest >>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:423) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:356) >>>>>>> ... 18 more >>>>>>> >>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0 >>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator >>>>>>> >>>>>>> I ran into a couple other Class not found errors, and was able to >>>>>>> solve them >>>>>>> by adding dependencies on the pom file, but have not found such a >>>>>>> solution >>>>>>> to this error. >>>>>>> >>>>>>> On the Kafka side of things, I am simply typing in messages as soon >>>>>>> as I >>>>>>> start the Java app on another console. Is this okay? >>>>>>> >>>>>>> I have not set up an advertised host on the kafka side as I was able >>>>>>> to >>>>>>> still receive messages from other consoles by setting up a consumer >>>>>>> to >>>>>>> listen to the private ip:port. Is this okay? >>>>>>> >>>>>>> Lastly, is there command, like --from-beginning for a consumer in >>>>>>> the java >>>>>>> application to get messages from the beginning? >>>>>>> >>>>>>> Thanks a lot for the help and happy holidays! >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> View this message in context: >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html >>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>> Nabble.com. >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >