Thanks Saisai,

Sure will do.

But just a quick note that when i set master as "local[*]" Spark started
showing Kafka messages as expected, so the problem in my view was to do
with not enough threads to process the incoming data.

Thanks.


On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> Would you please share your code snippet please, so we can identify is
> there anything wrong in your code.
>
> Beside would you please grep your driver's debug log to see if there's any
> debug log about "Stream xxx received block xxx", this means that Spark
> Streaming is keeping receiving data from sources like Kafka.
>
>
> 2015-04-01 16:18 GMT+08:00 James King <jakwebin...@gmail.com>:
>
>> Thank you bit1129,
>>
>> From looking at the web UI i can see 2 cores
>>
>> Also looking at http://spark.apache.org/docs/1.2.1/configuration.html
>>
>> But can't see obvious configuration for number of receivers can you help
>> please.
>>
>>
>> On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com <bit1...@163.com> wrote:
>>
>>> Please make sure that you have given more cores than Receiver numbers.
>>>
>>>
>>>
>>>
>>> *From:* James King <jakwebin...@gmail.com>
>>> *Date:* 2015-04-01 15:21
>>> *To:* user <user@spark.apache.org>
>>> *Subject:* Spark + Kafka
>>> I have a simple setup/runtime of Kafka and Sprak.
>>>
>>> I have a command line consumer displaying arrivals to Kafka topic. So i
>>> know messages are being received.
>>>
>>> But when I try to read from Kafka topic I get no messages, here are some
>>> logs below.
>>>
>>> I'm thinking there aren't enough threads. How do i check that.
>>>
>>> Thank you.
>>>
>>> 2015-04-01 08:56:50 INFO  JobScheduler:59 - Starting job streaming job
>>> 1427871410000 ms.0 from job set of time 1427871410000 ms
>>> 2015-04-01 08:56:50 INFO  JobScheduler:59 - Finished job streaming job
>>> 1427871410000 ms.0 from job set of time 1427871410000 ms
>>> 2015-04-01 08:56:50 INFO  JobScheduler:59 - Total delay: 0.002 s for
>>> time 1427871410000 ms (execution: 0.000 s)
>>> 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event
>>> ClearMetadata(1427871410000 ms)
>>> 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time
>>> 1427871410000 ms
>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old
>>> RDDs: []
>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs:
>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were
>>> older than 1427871405000 ms:
>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to
>>> old RDDs: [1427871405000 ms -> 8]
>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8
>>> 2015-04-01 08:56:50 INFO  BlockRDD:59 - Removing RDD 8 from persistence
>>> list
>>> 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received
>>> message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received
>>> message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
>>> 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled
>>> message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8
>>> 2015-04-01 08:56:50 INFO  BlockManager:59 - Removing RDD 8
>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD
>>> 8, response is 0
>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled
>>> message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
>>> 2015-04-01 08:56:50 INFO  KafkaInputDStream:59 - Removing blocks of RDD
>>> BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time
>>> 1427871410000 ms
>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that
>>> were older than 1427871405000 ms: 1427871405000 ms
>>> 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for
>>> time 1427871410000 ms
>>> 2015-04-01 08:56:50 INFO  ReceivedBlockTracker:59 - Deleting batches
>>> ArrayBuffer(1427871400000 ms)
>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0
>>> to Actor[akka://sparkDriver/temp/$o]
>>> 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>> received message ReviveOffers from
>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>> 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name:
>>> TaskSet_0, runningTasks: 0
>>> 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>> handled message (0.499181 ms) ReviveOffers from
>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>> 2015-04-01 08:56:50 WARN  TaskSchedulerImpl:71 - Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient resources
>>> 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>> received message ReviveOffers from
>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>> 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name:
>>> TaskSet_0, runningTasks: 0
>>> 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>> handled message (0.886121 ms) ReviveOffers from
>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received
>>> message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1))
>>> from Actor[akka.tcp://sparkMaster@somesparkhost
>>> :7077/user/Master#336117298]
>>> 2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated:
>>> app-20150401065621-0007/0 is now EXITED (Command exited with code 1)
>>> 2015-04-01 08:56:52 INFO  SparkDeploySchedulerBackend:59 - Executor
>>> app-20150401065621-0007/0 removed: Command exited with code 1
>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>> received message RemoveExecutor(0,Unknown executor exit code (1)) from
>>> Actor[akka://sparkDriver/temp/$p]
>>> 2015-04-01 08:56:52 ERROR SparkDeploySchedulerBackend:75 - Asked to
>>> remove non-existent executor 0
>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>> handled message (1.394052 ms) RemoveExecutor(0,Unknown executor exit code
>>> (1)) from Actor[akka://sparkDriver/temp/$p]
>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled
>>> message (6.653705 ms) ExecutorUpdated(0,EXITED,Some(Command exited with
>>> code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost
>>> :7077/user/Master#336117298]
>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received
>>> message
>>> ExecutorAdded(1,worker-20150331133159-somesparkhost-49556,somesparkhost:49556,2,512)
>>> from Actor[akka.tcp://sparkMaster@somesparkhost
>>> :7077/user/Master#336117298]
>>> 2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor added:
>>> app-20150401065621-0007/1 on worker-20150331133159-somesparkhost-49556
>>> (somesparkhost:49556) with 2 cores
>>> 2015-04-01 08:56:52 INFO  SparkDeploySchedulerBackend:59 - Granted
>>> executor ID app-20150401065621-0007/1 on hostPort somesparkhost:49556 with
>>> 2 cores, 512.0 MB RAM
>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled
>>> message (1.119731 ms)
>>> ExecutorAdded(1,worker-20150331133159-somesparkhost-49556,somesparkhost:49556,2,512)
>>> from Actor[akka.tcp://sparkMaster@somesparkhost
>>> :7077/user/Master#336117298]
>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received
>>> message ExecutorUpdated(1,LOADING,None,None) from
>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
>>> 2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated:
>>> app-20150401065621-0007/1 is now LOADING
>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled
>>> message (0.516301 ms) ExecutorUpdated(1,LOADING,None,None) from
>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>> received message ReviveOffers from
>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>> 2015-04-01 08:56:52 DEBUG TaskSchedulerImpl:63 - parentName: , name:
>>> TaskSet_0, runningTasks: 0
>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>> handled message (0.652891 ms) ReviveOffers from
>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received
>>> message ExecutorUpdated(1,RUNNING,None,None) from
>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
>>> 2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated:
>>> app-20150401065621-0007/1 is now RUNNING
>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled
>>> message (0.381614 ms) ExecutorUpdated(1,RUNNING,None,None) from
>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
>>> 2015-04-01 08:56:53 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>> received message ReviveOffers from
>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>> 2015-04-01 08:56:53 DEBUG TaskSchedulerImpl:63 - parentName: , name:
>>> TaskSet_0, runningTasks: 0
>>> 2015-04-01 08:56:53 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>> handled message (0.417759 ms) ReviveOffers from
>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>> 2015-04-01 08:56:54 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>> received message ReviveOffers from
>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>> 2015-04-01 08:56:54 DEBUG TaskSchedulerImpl:63 - parentName: , name:
>>> TaskSet_0, runningTasks: 0
>>> 2015-04-01 08:56:54 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>> handled message (1.426392 ms) ReviveOffers from
>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>> 2015-04-01 08:56:55 DEBUG RecurringTimer:63 - Callback for JobGenerator
>>> called at time 1427871415000
>>> 2015-04-01 08:56:55 DEBUG JobGenerator:63 - Got event
>>> GenerateJobs(1427871415000 ms)
>>> 2015-04-01 08:56:55 DEBUG DStreamGraph:63 - Generating jobs for time
>>> 1427871415000 ms
>>> 2015-04-01 08:56:55 DEBUG KafkaInputDStream:63 - Time 1427871415000 ms
>>> is valid
>>> 2015-04-01 08:56:55 DEBUG DStreamGraph:63 - Generated 1 jobs for time
>>> 1427871415000 ms
>>> 2015-04-01 08:56:55 INFO  JobScheduler:59 - Added jobs for time
>>> 1427871415000 ms
>>> 2015-04-01 08:56:55 DEBUG JobGenerator:63 - Got event
>>> DoCheckpoint(1427871415000 ms)
>>> -------------------------------------------
>>> Time: 1427871415000 ms
>>> -------------------------------------------
>>>
>>>
>>
>

Reply via email to