This is the code. And I couldn't find anything like the log you suggested.
public KafkaLogConsumer(int duration, String master) { JavaStreamingContext spark = createSparkContext(duration, master); Map<String, Integer> topics = new HashMap<String, Integer>(); topics.put("test", 1); JavaPairDStream<String, String> input = KafkaUtils.createStream(spark, "somesparkhost:2181", "groupid", topics); input.print(); spark.start(); spark.awaitTermination(); } private JavaStreamingContext createSparkContext(int duration, String master) { SparkConf sparkConf = new SparkConf() .setAppName(this.getClass().getSimpleName()) .setMaster(master); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(duration)); return ssc; } On Wed, Apr 1, 2015 at 11:37 AM, James King <jakwebin...@gmail.com> wrote: > Thanks Saisai, > > Sure will do. > > But just a quick note that when i set master as "local[*]" Spark started > showing Kafka messages as expected, so the problem in my view was to do > with not enough threads to process the incoming data. > > Thanks. > > > On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> Would you please share your code snippet please, so we can identify is >> there anything wrong in your code. >> >> Beside would you please grep your driver's debug log to see if there's >> any debug log about "Stream xxx received block xxx", this means that Spark >> Streaming is keeping receiving data from sources like Kafka. >> >> >> 2015-04-01 16:18 GMT+08:00 James King <jakwebin...@gmail.com>: >> >>> Thank you bit1129, >>> >>> From looking at the web UI i can see 2 cores >>> >>> Also looking at http://spark.apache.org/docs/1.2.1/configuration.html >>> >>> But can't see obvious configuration for number of receivers can you help >>> please. >>> >>> >>> On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com <bit1...@163.com> wrote: >>> >>>> Please make sure that you have given more cores than Receiver numbers. >>>> >>>> >>>> >>>> >>>> *From:* James King <jakwebin...@gmail.com> >>>> *Date:* 2015-04-01 15:21 >>>> *To:* user <user@spark.apache.org> >>>> *Subject:* Spark + Kafka >>>> I have a simple setup/runtime of Kafka and Sprak. >>>> >>>> I have a command line consumer displaying arrivals to Kafka topic. So i >>>> know messages are being received. >>>> >>>> But when I try to read from Kafka topic I get no messages, here are >>>> some logs below. >>>> >>>> I'm thinking there aren't enough threads. How do i check that. >>>> >>>> Thank you. >>>> >>>> 2015-04-01 08:56:50 INFO JobScheduler:59 - Starting job streaming job >>>> 1427871410000 ms.0 from job set of time 1427871410000 ms >>>> 2015-04-01 08:56:50 INFO JobScheduler:59 - Finished job streaming job >>>> 1427871410000 ms.0 from job set of time 1427871410000 ms >>>> 2015-04-01 08:56:50 INFO JobScheduler:59 - Total delay: 0.002 s for >>>> time 1427871410000 ms (execution: 0.000 s) >>>> 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event >>>> ClearMetadata(1427871410000 ms) >>>> 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time >>>> 1427871410000 ms >>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to >>>> old RDDs: [] >>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs: >>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were >>>> older than 1427871405000 ms: >>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to >>>> old RDDs: [1427871405000 ms -> 8] >>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: >>>> 8 >>>> 2015-04-01 08:56:50 INFO BlockRDD:59 - Removing RDD 8 from persistence >>>> list >>>> 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received >>>> message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] >>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received >>>> message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] >>>> 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled >>>> message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n] >>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8 >>>> 2015-04-01 08:56:50 INFO BlockManager:59 - Removing RDD 8 >>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD >>>> 8, response is 0 >>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled >>>> message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o] >>>> 2015-04-01 08:56:50 INFO KafkaInputDStream:59 - Removing blocks of RDD >>>> BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time >>>> 1427871410000 ms >>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that >>>> were older than 1427871405000 ms: 1427871405000 ms >>>> 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for >>>> time 1427871410000 ms >>>> 2015-04-01 08:56:50 INFO ReceivedBlockTracker:59 - Deleting batches >>>> ArrayBuffer(1427871400000 ms) >>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0 >>>> to Actor[akka://sparkDriver/temp/$o] >>>> 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor] >>>> received message ReviveOffers from >>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] >>>> 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name: >>>> TaskSet_0, runningTasks: 0 >>>> 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor] >>>> handled message (0.499181 ms) ReviveOffers from >>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] >>>> 2015-04-01 08:56:50 WARN TaskSchedulerImpl:71 - Initial job has not >>>> accepted any resources; check your cluster UI to ensure that workers are >>>> registered and have sufficient resources >>>> 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor] >>>> received message ReviveOffers from >>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] >>>> 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name: >>>> TaskSet_0, runningTasks: 0 >>>> 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor] >>>> handled message (0.886121 ms) ReviveOffers from >>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] >>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received >>>> message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1)) >>>> from Actor[akka.tcp://sparkMaster@somesparkhost >>>> :7077/user/Master#336117298] >>>> 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: >>>> app-20150401065621-0007/0 is now EXITED (Command exited with code 1) >>>> 2015-04-01 08:56:52 INFO SparkDeploySchedulerBackend:59 - Executor >>>> app-20150401065621-0007/0 removed: Command exited with code 1 >>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] >>>> received message RemoveExecutor(0,Unknown executor exit code (1)) from >>>> Actor[akka://sparkDriver/temp/$p] >>>> 2015-04-01 08:56:52 ERROR SparkDeploySchedulerBackend:75 - Asked to >>>> remove non-existent executor 0 >>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] >>>> handled message (1.394052 ms) RemoveExecutor(0,Unknown executor exit code >>>> (1)) from Actor[akka://sparkDriver/temp/$p] >>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled >>>> message (6.653705 ms) ExecutorUpdated(0,EXITED,Some(Command exited with >>>> code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost >>>> :7077/user/Master#336117298] >>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received >>>> message >>>> ExecutorAdded(1,worker-20150331133159-somesparkhost-49556,somesparkhost:49556,2,512) >>>> from Actor[akka.tcp://sparkMaster@somesparkhost >>>> :7077/user/Master#336117298] >>>> 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor added: >>>> app-20150401065621-0007/1 on worker-20150331133159-somesparkhost-49556 >>>> (somesparkhost:49556) with 2 cores >>>> 2015-04-01 08:56:52 INFO SparkDeploySchedulerBackend:59 - Granted >>>> executor ID app-20150401065621-0007/1 on hostPort somesparkhost:49556 with >>>> 2 cores, 512.0 MB RAM >>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled >>>> message (1.119731 ms) >>>> ExecutorAdded(1,worker-20150331133159-somesparkhost-49556,somesparkhost:49556,2,512) >>>> from Actor[akka.tcp://sparkMaster@somesparkhost >>>> :7077/user/Master#336117298] >>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received >>>> message ExecutorUpdated(1,LOADING,None,None) from >>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] >>>> 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: >>>> app-20150401065621-0007/1 is now LOADING >>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled >>>> message (0.516301 ms) ExecutorUpdated(1,LOADING,None,None) from >>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] >>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor] >>>> received message ReviveOffers from >>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] >>>> 2015-04-01 08:56:52 DEBUG TaskSchedulerImpl:63 - parentName: , name: >>>> TaskSet_0, runningTasks: 0 >>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor] >>>> handled message (0.652891 ms) ReviveOffers from >>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] >>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received >>>> message ExecutorUpdated(1,RUNNING,None,None) from >>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] >>>> 2015-04-01 08:56:52 INFO AppClient$ClientActor:59 - Executor updated: >>>> app-20150401065621-0007/1 is now RUNNING >>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled >>>> message (0.381614 ms) ExecutorUpdated(1,RUNNING,None,None) from >>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298] >>>> 2015-04-01 08:56:53 DEBUG SparkDeploySchedulerBackend:50 - [actor] >>>> received message ReviveOffers from >>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] >>>> 2015-04-01 08:56:53 DEBUG TaskSchedulerImpl:63 - parentName: , name: >>>> TaskSet_0, runningTasks: 0 >>>> 2015-04-01 08:56:53 DEBUG SparkDeploySchedulerBackend:56 - [actor] >>>> handled message (0.417759 ms) ReviveOffers from >>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] >>>> 2015-04-01 08:56:54 DEBUG SparkDeploySchedulerBackend:50 - [actor] >>>> received message ReviveOffers from >>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] >>>> 2015-04-01 08:56:54 DEBUG TaskSchedulerImpl:63 - parentName: , name: >>>> TaskSet_0, runningTasks: 0 >>>> 2015-04-01 08:56:54 DEBUG SparkDeploySchedulerBackend:56 - [actor] >>>> handled message (1.426392 ms) ReviveOffers from >>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119] >>>> 2015-04-01 08:56:55 DEBUG RecurringTimer:63 - Callback for JobGenerator >>>> called at time 1427871415000 >>>> 2015-04-01 08:56:55 DEBUG JobGenerator:63 - Got event >>>> GenerateJobs(1427871415000 ms) >>>> 2015-04-01 08:56:55 DEBUG DStreamGraph:63 - Generating jobs for time >>>> 1427871415000 ms >>>> 2015-04-01 08:56:55 DEBUG KafkaInputDStream:63 - Time 1427871415000 ms >>>> is valid >>>> 2015-04-01 08:56:55 DEBUG DStreamGraph:63 - Generated 1 jobs for time >>>> 1427871415000 ms >>>> 2015-04-01 08:56:55 INFO JobScheduler:59 - Added jobs for time >>>> 1427871415000 ms >>>> 2015-04-01 08:56:55 DEBUG JobGenerator:63 - Got event >>>> DoCheckpoint(1427871415000 ms) >>>> ------------------------------------------- >>>> Time: 1427871415000 ms >>>> ------------------------------------------- >>>> >>>> >>> >> >