This is the  code. And I couldn't find anything like the log you suggested.

public KafkaLogConsumer(int duration, String master) {
JavaStreamingContext spark = createSparkContext(duration, master);
 Map<String, Integer> topics = new HashMap<String, Integer>();
topics.put("test", 1);
 JavaPairDStream<String, String> input = KafkaUtils.createStream(spark,
"somesparkhost:2181", "groupid", topics);
input.print();

spark.start();
spark.awaitTermination();
}
 private JavaStreamingContext createSparkContext(int duration, String
master) {

SparkConf sparkConf = new SparkConf()
.setAppName(this.getClass().getSimpleName())
.setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(duration));
return ssc;
}

On Wed, Apr 1, 2015 at 11:37 AM, James King <jakwebin...@gmail.com> wrote:

> Thanks Saisai,
>
> Sure will do.
>
> But just a quick note that when i set master as "local[*]" Spark started
> showing Kafka messages as expected, so the problem in my view was to do
> with not enough threads to process the incoming data.
>
> Thanks.
>
>
> On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> Would you please share your code snippet please, so we can identify is
>> there anything wrong in your code.
>>
>> Beside would you please grep your driver's debug log to see if there's
>> any debug log about "Stream xxx received block xxx", this means that Spark
>> Streaming is keeping receiving data from sources like Kafka.
>>
>>
>> 2015-04-01 16:18 GMT+08:00 James King <jakwebin...@gmail.com>:
>>
>>> Thank you bit1129,
>>>
>>> From looking at the web UI i can see 2 cores
>>>
>>> Also looking at http://spark.apache.org/docs/1.2.1/configuration.html
>>>
>>> But can't see obvious configuration for number of receivers can you help
>>> please.
>>>
>>>
>>> On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.com <bit1...@163.com> wrote:
>>>
>>>> Please make sure that you have given more cores than Receiver numbers.
>>>>
>>>>
>>>>
>>>>
>>>> *From:* James King <jakwebin...@gmail.com>
>>>> *Date:* 2015-04-01 15:21
>>>> *To:* user <user@spark.apache.org>
>>>> *Subject:* Spark + Kafka
>>>> I have a simple setup/runtime of Kafka and Sprak.
>>>>
>>>> I have a command line consumer displaying arrivals to Kafka topic. So i
>>>> know messages are being received.
>>>>
>>>> But when I try to read from Kafka topic I get no messages, here are
>>>> some logs below.
>>>>
>>>> I'm thinking there aren't enough threads. How do i check that.
>>>>
>>>> Thank you.
>>>>
>>>> 2015-04-01 08:56:50 INFO  JobScheduler:59 - Starting job streaming job
>>>> 1427871410000 ms.0 from job set of time 1427871410000 ms
>>>> 2015-04-01 08:56:50 INFO  JobScheduler:59 - Finished job streaming job
>>>> 1427871410000 ms.0 from job set of time 1427871410000 ms
>>>> 2015-04-01 08:56:50 INFO  JobScheduler:59 - Total delay: 0.002 s for
>>>> time 1427871410000 ms (execution: 0.000 s)
>>>> 2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event
>>>> ClearMetadata(1427871410000 ms)
>>>> 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time
>>>> 1427871410000 ms
>>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to
>>>> old RDDs: []
>>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs:
>>>> 2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were
>>>> older than 1427871405000 ms:
>>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to
>>>> old RDDs: [1427871405000 ms -> 8]
>>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs:
>>>> 8
>>>> 2015-04-01 08:56:50 INFO  BlockRDD:59 - Removing RDD 8 from persistence
>>>> list
>>>> 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received
>>>> message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
>>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received
>>>> message RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
>>>> 2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled
>>>> message (0.287257 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
>>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - removing RDD 8
>>>> 2015-04-01 08:56:50 INFO  BlockManager:59 - Removing RDD 8
>>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Done removing RDD
>>>> 8, response is 0
>>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:56 - [actor] handled
>>>> message (0.038047 ms) RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
>>>> 2015-04-01 08:56:50 INFO  KafkaInputDStream:59 - Removing blocks of RDD
>>>> BlockRDD[8] at createStream at KafkaLogConsumer.java:53 of time
>>>> 1427871410000 ms
>>>> 2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Cleared 1 RDDs that
>>>> were older than 1427871405000 ms: 1427871405000 ms
>>>> 2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Cleared old metadata for
>>>> time 1427871410000 ms
>>>> 2015-04-01 08:56:50 INFO  ReceivedBlockTracker:59 - Deleting batches
>>>> ArrayBuffer(1427871400000 ms)
>>>> 2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:63 - Sent response: 0
>>>> to Actor[akka://sparkDriver/temp/$o]
>>>> 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>>> received message ReviveOffers from
>>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>>> 2015-04-01 08:56:50 DEBUG TaskSchedulerImpl:63 - parentName: , name:
>>>> TaskSet_0, runningTasks: 0
>>>> 2015-04-01 08:56:50 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>>> handled message (0.499181 ms) ReviveOffers from
>>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>>> 2015-04-01 08:56:50 WARN  TaskSchedulerImpl:71 - Initial job has not
>>>> accepted any resources; check your cluster UI to ensure that workers are
>>>> registered and have sufficient resources
>>>> 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>>> received message ReviveOffers from
>>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>>> 2015-04-01 08:56:51 DEBUG TaskSchedulerImpl:63 - parentName: , name:
>>>> TaskSet_0, runningTasks: 0
>>>> 2015-04-01 08:56:51 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>>> handled message (0.886121 ms) ReviveOffers from
>>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received
>>>> message ExecutorUpdated(0,EXITED,Some(Command exited with code 1),Some(1))
>>>> from Actor[akka.tcp://sparkMaster@somesparkhost
>>>> :7077/user/Master#336117298]
>>>> 2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated:
>>>> app-20150401065621-0007/0 is now EXITED (Command exited with code 1)
>>>> 2015-04-01 08:56:52 INFO  SparkDeploySchedulerBackend:59 - Executor
>>>> app-20150401065621-0007/0 removed: Command exited with code 1
>>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>>> received message RemoveExecutor(0,Unknown executor exit code (1)) from
>>>> Actor[akka://sparkDriver/temp/$p]
>>>> 2015-04-01 08:56:52 ERROR SparkDeploySchedulerBackend:75 - Asked to
>>>> remove non-existent executor 0
>>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>>> handled message (1.394052 ms) RemoveExecutor(0,Unknown executor exit code
>>>> (1)) from Actor[akka://sparkDriver/temp/$p]
>>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled
>>>> message (6.653705 ms) ExecutorUpdated(0,EXITED,Some(Command exited with
>>>> code 1),Some(1)) from Actor[akka.tcp://sparkMaster@somesparkhost
>>>> :7077/user/Master#336117298]
>>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received
>>>> message
>>>> ExecutorAdded(1,worker-20150331133159-somesparkhost-49556,somesparkhost:49556,2,512)
>>>> from Actor[akka.tcp://sparkMaster@somesparkhost
>>>> :7077/user/Master#336117298]
>>>> 2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor added:
>>>> app-20150401065621-0007/1 on worker-20150331133159-somesparkhost-49556
>>>> (somesparkhost:49556) with 2 cores
>>>> 2015-04-01 08:56:52 INFO  SparkDeploySchedulerBackend:59 - Granted
>>>> executor ID app-20150401065621-0007/1 on hostPort somesparkhost:49556 with
>>>> 2 cores, 512.0 MB RAM
>>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled
>>>> message (1.119731 ms)
>>>> ExecutorAdded(1,worker-20150331133159-somesparkhost-49556,somesparkhost:49556,2,512)
>>>> from Actor[akka.tcp://sparkMaster@somesparkhost
>>>> :7077/user/Master#336117298]
>>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received
>>>> message ExecutorUpdated(1,LOADING,None,None) from
>>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
>>>> 2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated:
>>>> app-20150401065621-0007/1 is now LOADING
>>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled
>>>> message (0.516301 ms) ExecutorUpdated(1,LOADING,None,None) from
>>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
>>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>>> received message ReviveOffers from
>>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>>> 2015-04-01 08:56:52 DEBUG TaskSchedulerImpl:63 - parentName: , name:
>>>> TaskSet_0, runningTasks: 0
>>>> 2015-04-01 08:56:52 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>>> handled message (0.652891 ms) ReviveOffers from
>>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:50 - [actor] received
>>>> message ExecutorUpdated(1,RUNNING,None,None) from
>>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
>>>> 2015-04-01 08:56:52 INFO  AppClient$ClientActor:59 - Executor updated:
>>>> app-20150401065621-0007/1 is now RUNNING
>>>> 2015-04-01 08:56:52 DEBUG AppClient$ClientActor:56 - [actor] handled
>>>> message (0.381614 ms) ExecutorUpdated(1,RUNNING,None,None) from
>>>> Actor[akka.tcp://sparkMaster@somesparkhost:7077/user/Master#336117298]
>>>> 2015-04-01 08:56:53 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>>> received message ReviveOffers from
>>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>>> 2015-04-01 08:56:53 DEBUG TaskSchedulerImpl:63 - parentName: , name:
>>>> TaskSet_0, runningTasks: 0
>>>> 2015-04-01 08:56:53 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>>> handled message (0.417759 ms) ReviveOffers from
>>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>>> 2015-04-01 08:56:54 DEBUG SparkDeploySchedulerBackend:50 - [actor]
>>>> received message ReviveOffers from
>>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>>> 2015-04-01 08:56:54 DEBUG TaskSchedulerImpl:63 - parentName: , name:
>>>> TaskSet_0, runningTasks: 0
>>>> 2015-04-01 08:56:54 DEBUG SparkDeploySchedulerBackend:56 - [actor]
>>>> handled message (1.426392 ms) ReviveOffers from
>>>> Actor[akka://sparkDriver/user/CoarseGrainedScheduler#1727295119]
>>>> 2015-04-01 08:56:55 DEBUG RecurringTimer:63 - Callback for JobGenerator
>>>> called at time 1427871415000
>>>> 2015-04-01 08:56:55 DEBUG JobGenerator:63 - Got event
>>>> GenerateJobs(1427871415000 ms)
>>>> 2015-04-01 08:56:55 DEBUG DStreamGraph:63 - Generating jobs for time
>>>> 1427871415000 ms
>>>> 2015-04-01 08:56:55 DEBUG KafkaInputDStream:63 - Time 1427871415000 ms
>>>> is valid
>>>> 2015-04-01 08:56:55 DEBUG DStreamGraph:63 - Generated 1 jobs for time
>>>> 1427871415000 ms
>>>> 2015-04-01 08:56:55 INFO  JobScheduler:59 - Added jobs for time
>>>> 1427871415000 ms
>>>> 2015-04-01 08:56:55 DEBUG JobGenerator:63 - Got event
>>>> DoCheckpoint(1427871415000 ms)
>>>> -------------------------------------------
>>>> Time: 1427871415000 ms
>>>> -------------------------------------------
>>>>
>>>>
>>>
>>
>

Reply via email to