As suggested by Qiaou, looked at the UI:

1)  Under 'Stages' the only 'active' stage is:  runJob at
ReceiverTracker.scala:275
2)  Under 'Executors', there's only 1 active task, but I don't see any
output (or logs)
3)  Under 'Streaming', there's one receiver called, 'KafkaReciever-0', but
'Records in last batch' are 0.

Honestly, I think it's not connecting to my Kafka topic - possibly because
I need to pass the following parameter:

metadata.broker.list -> <machine>:9092

But I don't know how to pass this to KafkaUtils.createStream(...).  Could
that be the problem?



On Tue, Nov 4, 2014 at 11:12 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Your code doesn't trigger any action. How about the following?
>
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
> Duration(60 * 1 * 1000));
>
>         JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc,
> "<machine>:2181", "1", map);
>
>         JavaDStream<String> statuses = tweets.map(
>                 new Function<String, String>() {
>                     public String call(String status) {
>                         System.out.println(status);
>                         return status;
>                     }
>                 }
>         );
>
>
> statuses​.print()
> ​;​
>  ​
>
> ​​
>
> Or you could use foreachRDD instead of map() if your intention is just
> printing.​​
>
> Thanks
> Best Regards
>
> On Wed, Nov 5, 2014 at 12:35 PM, Something Something <
> mailinglist...@gmail.com> wrote:
>
>> It's not local.  My spark url is something like this:
>>
>>         String sparkUrl = "spark://<host name>:7077";
>>
>>
>> On Tue, Nov 4, 2014 at 11:03 PM, Jain Rahul <ja...@ivycomptech.com>
>> wrote:
>>
>>>
>>>  I think you are running it locally.
>>> Do you have local[1] here for master url? If yes change it to local[2]
>>> or more number of threads.
>>> It may be due to topic name mismatch also.
>>>
>>>      sparkConf.setMaster(“local[1]");
>>>
>>>  Regards,
>>> Rahul
>>>
>>>   From: Something Something <mailinglist...@gmail.com>
>>> Date: Wednesday, November 5, 2014 at 12:23 PM
>>> To: "Shao, Saisai" <saisai.s...@intel.com>
>>> Cc: "user@spark.apache.org" <user@spark.apache.org>
>>>
>>> Subject: Re: Kafka Consumer in Spark Streaming
>>>
>>>   Added foreach as follows.  Still don't see any output on my console.
>>> Would this go to the worker logs as Jerry indicated?
>>>
>>>         JavaPairReceiverInputDStream tweets =
>>> KafkaUtils.createStream(ssc, "<mymachine>:2181", "1", map);
>>>         JavaDStream<String> statuses = tweets.map(
>>>                 new Function<String, String>() {
>>>                     public String call(String status) {
>>>                         return status;
>>>                     }
>>>                 }
>>>         );
>>>
>>>         statuses.foreach(new Function<JavaRDD<String>, Void>() {
>>>             @Override
>>>             public Void call(JavaRDD<String> stringJavaRDD) throws
>>> Exception {
>>>                 for (String str: stringJavaRDD.take(10)) {
>>>                     System.out.println("Message: " + str);
>>>                 }
>>>                 return null;
>>>             }
>>>         });
>>>
>>>
>>> On Tue, Nov 4, 2014 at 10:32 PM, Shao, Saisai <saisai.s...@intel.com>
>>> wrote:
>>>
>>>>  If you’re running on a standalone mode, the log is under
>>>> <SPAR_HOME>/work/ directory. I’m not sure for yarn or mesos, you can check
>>>> the document of Spark to see the details.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>> *From:* Something Something [mailto:mailinglist...@gmail.com]
>>>> *Sent:* Wednesday, November 05, 2014 2:28 PM
>>>> *To:* Shao, Saisai
>>>> *Cc:* user@spark.apache.org
>>>> *Subject:* Re: Kafka Consumer in Spark Streaming
>>>>
>>>>
>>>>
>>>> The Kafka broker definitely has messages coming in.  But your #2 point
>>>> is valid.  Needless to say I am a newbie to Spark.  I can't figure out
>>>> where the 'executor' logs would be.  How would I find them?
>>>>
>>>> All I see printed on my screen is this:
>>>>
>>>> 14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started
>>>> 14/11/04 22:21:23 INFO Remoting: Starting remoting
>>>> 14/11/04 22:21:24 INFO Remoting: Remoting started; listening on
>>>> addresses :[akka.tcp://spark@mymachie:60743]
>>>> 14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses:
>>>> [akka.tcp://spark@mymachine:60743]
>>>> 14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop
>>>> library for your platform... using builtin-java classes where applicable
>>>> 14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling
>>>> back to shell based
>>>> -------------------------------------------
>>>> Time: 1415168520000 ms
>>>> -------------------------------------------
>>>> -------------------------------------------
>>>> Time: 1415168520000 ms
>>>> -------------------------------------------
>>>>
>>>> Keeps repeating this...
>>>>
>>>>
>>>>
>>>> On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai <saisai.s...@intel.com>
>>>> wrote:
>>>>
>>>>  Hi, would you mind describing your problem a little more specific.
>>>>
>>>>
>>>>
>>>> 1.      Is the Kafka broker currently has no data feed in?
>>>>
>>>> 2.      This code will print the lines, but not in the driver side,
>>>> the code is running in the executor side, so you can check the log in
>>>> worker dir to see if there’s any printing logs under this folder.
>>>>
>>>> 3.      Did you see any exceptions when running the app, this will
>>>> help to define the problem.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>> *From:* Something Something [mailto:mailinglist...@gmail.com]
>>>> *Sent:* Wednesday, November 05, 2014 1:57 PM
>>>> *To:* user@spark.apache.org
>>>> *Subject:* Kafka Consumer in Spark Streaming
>>>>
>>>>
>>>>
>>>> I've following code in my program.  I don't get any error, but it's not
>>>> consuming the messages either.  Shouldn't the following code print the line
>>>> in the 'call' method?  What am I missing?
>>>>
>>>> Please help.  Thanks.
>>>>
>>>>
>>>>
>>>>         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>>>> new Duration(60 * 1 * 1000));
>>>>
>>>>         JavaPairReceiverInputDStream tweets =
>>>> KafkaUtils.createStream(ssc, "<machine>:2181", "1", map);
>>>>
>>>>         JavaDStream<String> statuses = tweets.map(
>>>>                 new Function<String, String>() {
>>>>                     public String call(String status) {
>>>>                         System.out.println(status);
>>>>                         return status;
>>>>                     }
>>>>                 }
>>>>         );
>>>>
>>>>
>>>>
>>>
>>>   This email and any attachments are confidential, and may be legally
>>> privileged and protected by copyright. If you are not the intended
>>> recipient dissemination or copying of this email is prohibited. If you have
>>> received this in error, please notify the sender by replying by email and
>>> then delete the email completely from your system. Any views or opinions
>>> are solely those of the sender. This communication is not intended to form
>>> a binding contract unless expressly indicated to the contrary and properly
>>> authorised. Any actions taken on the basis of this email are at the
>>> recipient's own risk.
>>>
>>
>>
>

Reply via email to