So it got rid of the logs, but the problem still persists that :

a) The program never terminates (I have pasted all output after the Hello
World statements below)

b) I am not seeing the word count

c) I tried adding [2] next to my looking at this post,
but that did not work as well.

Any other suggestions are appreciated.

Thanks a lot for the time :)

14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
starting auto committer every 60000 ms
14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
[], begin
registering consumer in ZK
14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
[], end
registering consumer in ZK
14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
starting watcher executor thread for consumer
14/12/29 08:46:38 INFO ZookeeperConsumerConnector:
[], begin
rebalancing consumer try #0
14/12/29 08:46:39 INFO ConsumerFetcherManager:
[ConsumerFetcherManager-1419860798873] Stopping leader finder thread
14/12/29 08:46:39 INFO ConsumerFetcherManager:
[ConsumerFetcherManager-1419860798873] Stopping all fetchers
14/12/29 08:46:39 INFO ConsumerFetcherManager:
[ConsumerFetcherManager-1419860798873] All connections stopped
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
Cleared all relevant queues for this fetcher
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
Cleared the data chunks in all the consumer message iterators
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
Committing all offsets after clearing the fetcher queues
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
Releasing partition ownership
14/12/29 08:46:39 INFO RangeAssignor: Consumer
rebalancing the following partitions: ArrayBuffer(0) for topic test with
14/12/29 08:46:39 INFO RangeAssignor:
attempting to claim partition 0
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
successfully owned partition 0 for topic test
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
selected partitions : test:0: fetched offset = 221: consumed offset = 221
14/12/29 08:46:39 INFO ConsumerFetcherManager$LeaderFinderThread:
14/12/29 08:46:39 INFO ZookeeperConsumerConnector:
[], end
rebalancing consumer try #0
14/12/29 08:46:39 INFO VerifiableProperties: Verifying properties
14/12/29 08:46:39 INFO VerifiableProperties: Property is
overridden to c1
14/12/29 08:46:39 INFO VerifiableProperties: Property
is overridden to
14/12/29 08:46:39 INFO VerifiableProperties: Property is
overridden to 30000
14/12/29 08:46:39 INFO ClientUtils$: Fetching metadata from broker
id:0,,port:9092 with
correlation id 0 for 1 topic(s) Set(test)
14/12/29 08:46:39 INFO SyncProducer: Connected to for producing
14/12/29 08:46:39 INFO SyncProducer: Disconnecting from
14/12/29 08:46:39 INFO ConsumerFetcherThread:
14/12/29 08:46:39 INFO ConsumerFetcherManager:
[ConsumerFetcherManager-1419860798873] Added fetcher for partitions
ArrayBuffer([[test,0], initOffset 221 to broker
id:0,,port:9092] )

Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Mon, Dec 29, 2014 at 12:43 AM, Akhil Das <>

> Now, Add these lines to get ride of those logs
>     import org.apache.log4j.Logger
>     import org.apache.log4j.Level
>     Logger.getLogger("org").setLevel(Level.OFF)
>     Logger.getLogger("akka").setLevel(Level.OFF)
> Thanks
> Best Regards
> On Mon, Dec 29, 2014 at 2:09 PM, Suhas Shekar <>
> wrote:
>> Hmmm..soo I added 10000 (10,000) to jssc.awaitTermination , however it
>> does not stop. When I am not pushing in any data it gives me this:
>> 14/12/29 08:35:12 INFO ReceiverTracker: Stream 0 received 0 blocks
>> 14/12/29 08:35:12 INFO JobScheduler: Added jobs for time 1419860112000 ms
>> 14/12/29 08:35:14 INFO ReceiverTracker: Stream 0 received 0 blocks
>> 14/12/29 08:35:14 INFO JobScheduler: Added jobs for time 1419860114000 ms
>> 14/12/29 08:35:16 INFO ReceiverTracker: Stream 0 received 0 blocks
>> When I am pushing in data it does this:
>> 14/12/29 08:35:08 WARN BlockManager: Block input-0-1419860108200 already
>> exists on this machine; not re-adding it
>> 14/12/29 08:35:08 INFO BlockGenerator: Pushed block input-0-1419860108200
>> 14/12/29 08:35:09 INFO MemoryStore: ensureFreeSpace(80) called with
>> curMem=6515, maxMem=277842493
>> 14/12/29 08:35:09 INFO MemoryStore: Block input-0-1419860109200 stored as
>> bytes in memory (estimated size 80.0 B, free 265.0 MB)
>> 14/12/29 08:35:09 INFO BlockManagerInfo: Added input-0-1419860109200 in
>> memory on (size: 80.0 B,
>> free: 265.0 MB)
>> 14/12/29 08:35:09 INFO BlockManagerMaster: Updated info of block
>> input-0-1419860109200
>> 14/12/29 08:35:09 WARN BlockManager: Block input-0-1419860109200 already
>> exists on this machine; not re-adding it
>> 14/12/29 08:35:09 INFO BlockGenerator: Pushed block input-0-1419860109200
>> 14/12/29 08:35:10 INFO ReceiverTracker: Stream 0 received 2 blocks
>> I know I am close as everytime I enter a message in my kafka producer,
>> the console reacts as I showed I have to place the
>> awaitTermination somewhere else? Or Is the warning saying there is an
>> underlying problem?
>> Thank you for the help...hopefully I am as close as I think I am!
>> Suhas Shekar
>> University of California, Los Angeles
>> B.A. Economics, Specialization in Computing 2014
>> On Mon, Dec 29, 2014 at 12:28 AM, Akhil Das <>
>> wrote:
>>> If you want to stop the streaming after 10 seconds, then use
>>> ssc.awaitTermination(10000). Make sure you push some data to kafka for the
>>> streaming to consume within the 10 seconds.
>>> Thanks
>>> Best Regards
>>> On Mon, Dec 29, 2014 at 1:53 PM, Suhas Shekar <>
>>> wrote:
>>>> I'm very close! So I added that and then I added this:
>>>> and it seems as though the stream is working as it says Stream 0
>>>> received 1 or 2 blocks as I enter in messages on my kafka producer.
>>>> However, the Receiver seems to keep trying every 2 seconds (as I've
>>>> included 2000 in my duration in my java app). How can I stop the Receiver
>>>> from consuming messages after 10 seconds and output the word count to the
>>>> console?
>>>> Thanks a lot for all the help! I'm excited to see this word count :)
>>>> Suhas Shekar
>>>> University of California, Los Angeles
>>>> B.A. Economics, Specialization in Computing 2014
>>>> On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <
>>>> > wrote:
>>>>> Add this jar in the dependency
>>>>> Thanks
>>>>> Best Regards
>>>>> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <>
>>>>> wrote:
>>>>>> Hello Akhil,
>>>>>> I chanced my Kafka dependency to 2.10 (which is the version of kafka
>>>>>> that was on I am getting a slightly different error, but at
>>>>>> the same place as the previous error (pasted below).
>>>>>> FYI, when I make these changes to the pom file, I do "mvn clean
>>>>>> package" then cp the new jar files from the repository to my lib of jar
>>>>>> files which is a argument in my spark-submit script which is in my 
>>>>>> original
>>>>>> post.
>>>>>> Thanks again for the time and help...much appreciated.
>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream
>>>>>> with group: c1
>>>>>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property is
>>>>>> overridden to c1
>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>>> zookeeper.connect is overridden to
>>>>>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>>>>>> is overridden to 10000
>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with
>>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>>> com/yammer/metrics/Metrics
>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop
>>>>>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver
>>>>>> 0
>>>>>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for
>>>>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>> com/yammer/metrics/Metrics
>>>>>>         at
>>>>>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>>>>>>         at
>>>>>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>>>>>>         at
>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>>>>>>         at
>>>>>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>>>>>>         at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>>>>>>         at
>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>>>>>>         at
>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>         at
>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>         at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>         at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>         at
>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>         at
>>>>>>         at
>>>>>> org.apache.spark.executor.Executor$
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$
>>>>>>         at
>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>> com.yammer.metrics.Metrics
>>>>>>         at$
>>>>>>         at$
>>>>>>         at Method)
>>>>>>         at
>>>>>>         at java.lang.ClassLoader.loadClass(
>>>>>>         at java.lang.ClassLoader.loadClass(
>>>>>>         ... 18 more
>>>>>> Suhas Shekar
>>>>>> University of California, Los Angeles
>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <
>>>>>> > wrote:
>>>>>>> I made both versions 1.1.1 and I got the same error. I then tried
>>>>>>> making both 1.1.0 as that is the version of my Spark Core, but I got the
>>>>>>> same error.
>>>>>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>>>>>>> streaming kafka dependency is 2.10.x...I will try changing that next, 
>>>>>>> but
>>>>>>> don't think that will solve the error as I dont think the application 
>>>>>>> had
>>>>>>> got to level yet.
>>>>>>> Please let me know of any possible next steps.
>>>>>>> Thank you again for the time and the help!
>>>>>>> Suhas Shekar
>>>>>>> University of California, Los Angeles
>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <
>>>>>>>> wrote:
>>>>>>>> Just looked at the pom file that you are using, why are you having
>>>>>>>> different versions in it?
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>>>>>>> <version>*1.1.1*</version>
>>>>>>>> </dependency>
>>>>>>>> <dependency>
>>>>>>>> <groupId>org.apache.spark</groupId>
>>>>>>>> <artifactId>spark-streaming_2.10</artifactId>
>>>>>>>> <version>*1.0.2*</version>
>>>>>>>> </dependency>
>>>>>>>> ​can you make both the versions the same?​
>>>>>>>> Thanks
>>>>>>>> Best Regards
>>>>>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <
>>>>>>>>> wrote:
>>>>>>>>> 1) Could you please clarify on what you mean by checking the Scala
>>>>>>>>> version is correct? In my pom.xml file it is 2.10.4 (which is the 
>>>>>>>>> same as
>>>>>>>>> when I start spark-shell).
>>>>>>>>> 2) The spark master URL is definitely correct as I have run other
>>>>>>>>> apps with the same script that use Spark (like a word count with a 
>>>>>>>>> local
>>>>>>>>> file)
>>>>>>>>> Thank you for the help!
>>>>>>>>> Suhas Shekar
>>>>>>>>> University of California, Los Angeles
>>>>>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <
>>>>>>>>>> wrote:
>>>>>>>>>> Make sure you verify the following:
>>>>>>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on
>>>>>>>>>> the webui's top left corner (running on port 8080)
>>>>>>>>>> Thanks
>>>>>>>>>> Best Regards
>>>>>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <
>>>>>>>>>>> wrote:
>>>>>>>>>>> Hello Everyone,
>>>>>>>>>>> Thank you for the time and the help :).
>>>>>>>>>>> My goal here is to get this program working:
>>>>>>>>>>> The only lines I do not have from the example are lines 62-67.
>>>>>>>>>>> pom.xml
>>>>>>>>>>> <
>>>>>>>>>>> >
>>>>>>>>>>> Background: Have ec2 instances running. The standalone spark is
>>>>>>>>>>> running on
>>>>>>>>>>> top of Cloudera Manager 5.2.
>>>>>>>>>>> Pom file is attached and the same for both clusters.
>>>>>>>>>>> pom.xml
>>>>>>>>>>> <
>>>>>>>>>>> >
>>>>>>>>>>> Here are a few different approaches I have taken and the issues
>>>>>>>>>>> I run into:
>>>>>>>>>>> *Standalone Mode*
>>>>>>>>>>> 1) Use spark-submit script to run:
>>>>>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>>>>>>> --class SimpleApp --master spark://  --jars
>>>>>>>>>>> $(echo
>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>>>>>> Interesting...I was getting an error like this: Initial job has
>>>>>>>>>>> not accepted
>>>>>>>>>>> any resources; check your cluster UI
>>>>>>>>>>> Now, when I run, it prints out the 3 Hello world statements in
>>>>>>>>>>> my code:
>>>>>>>>>>> KafkaJavaConsumer.txt
>>>>>>>>>>> <
>>>>>>>>>>> >
>>>>>>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer
>>>>>>>>>>> Stream with
>>>>>>>>>>> group: c1
>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for
>>>>>>>>>>> stream 0
>>>>>>>>>>> from akka://sparkDriver
>>>>>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing
>>>>>>>>>>> thread
>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver
>>>>>>>>>>> with
>>>>>>>>>>> message: Error starting receiver 0:
>>>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver
>>>>>>>>>>> onStop
>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>>>>>> receiver 0
>>>>>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver
>>>>>>>>>>> for stream
>>>>>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>>>>>> scala/reflect/ClassManifest
>>>>>>>>>>>         at
>>>>>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>>>>>>         at
>>>>>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>>>>>>         at
>>>>>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>>>>>>         at
>>>>>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>>>>>         at
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.executor.Executor$
>>>>>>>>>>>         at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>>>>>>         at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$
>>>>>>>>>>>         at
>>>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>>>> scala.reflect.ClassManifest
>>>>>>>>>>>         at$
>>>>>>>>>>>         at$
>>>>>>>>>>>         at
>>>>>>>>>>> Method)
>>>>>>>>>>>         at
>>>>>>>>>>>         at java.lang.ClassLoader.loadClass(
>>>>>>>>>>>         at java.lang.ClassLoader.loadClass(
>>>>>>>>>>>         ... 18 more
>>>>>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0
>>>>>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>>>>>> I ran into a couple other Class not found errors, and was able
>>>>>>>>>>> to solve them
>>>>>>>>>>> by adding dependencies on the pom file, but have not found such
>>>>>>>>>>> a solution
>>>>>>>>>>> to this error.
>>>>>>>>>>> On the Kafka side of things, I am simply typing in messages as
>>>>>>>>>>> soon as I
>>>>>>>>>>> start the Java app on another console. Is this okay?
>>>>>>>>>>> I have not set up an advertised host on the kafka side as I was
>>>>>>>>>>> able to
>>>>>>>>>>> still receive messages from other consoles by setting up a
>>>>>>>>>>> consumer to
>>>>>>>>>>> listen to the private ip:port. Is this okay?
>>>>>>>>>>> Lastly, is there command, like --from-beginning for a consumer
>>>>>>>>>>> in the java
>>>>>>>>>>> application to get messages from the beginning?
>>>>>>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>>>>>> --
>>>>>>>>>>> View this message in context:
>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe, e-mail:
>>>>>>>>>>> For additional commands, e-mail:

Reply via email to