Unsubscribe

2021-07-06 Thread Ramkumar V
Unsubscribe


Re: Spark SQL query for List

2016-04-26 Thread Ramkumar V
I'm getting following exception if i form a query like this. Its not coming
to the point where get(0) or get(1).

Exception in thread "main" java.lang.RuntimeException: [1.22] failure:
``*'' expected but `cities' found


*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Tue, Apr 26, 2016 at 4:41 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

> Doesn't get(0) give you the Array[String] for CITY (am I missing
> something?)
> On 26 Apr 2016 11:02 p.m., "Ramkumar V" <ramkumar.c...@gmail.com> wrote:
>
> JavaSparkContext ctx = new JavaSparkContext(sparkConf);
>
> SQLContext sqlContext = new SQLContext(ctx);
>
> DataFrame parquetFile = sqlContext.parquetFile(
> "hdfs:/XYZ:8020/user/hdfs/parquet/*.parquet");
>
>parquetFile.registerTempTable("parquetFile");
>
> DataFrame tempDF = sqlContext.sql("SELECT TOUR.CITIES, BUDJET from
> parquetFile");
>
> JavaRDDjRDD = tempDF.toJavaRDD();
>
>  JavaRDD ones = jRDD.map(new Function<Row,String>() {
>
>   public String call(Row row) throws Exception {
>
> return row.getString(1);
>
>   }
>
> });
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Tue, Apr 26, 2016 at 3:48 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:
>
>> Could you maybe share your codes?
>> On 26 Apr 2016 9:51 p.m., "Ramkumar V" <ramkumar.c...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I had loaded JSON file in parquet format into SparkSQL. I can't able to
>>> read List which is inside JSON.
>>>
>>> Sample JSON
>>>
>>> {
>>> "TOUR" : {
>>>  "CITIES" : ["Paris","Berlin","Prague"]
>>> },
>>> "BUDJET" : 100
>>> }
>>>
>>> I want to read value of CITIES.
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>


Re: Spark SQL query for List

2016-04-26 Thread Ramkumar V
JavaSparkContext ctx = new JavaSparkContext(sparkConf);

SQLContext sqlContext = new SQLContext(ctx);

DataFrame parquetFile = sqlContext.parquetFile(
"hdfs:/XYZ:8020/user/hdfs/parquet/*.parquet");

   parquetFile.registerTempTable("parquetFile");

DataFrame tempDF = sqlContext.sql("SELECT TOUR.CITIES, BUDJET from
parquetFile");

JavaRDDjRDD = tempDF.toJavaRDD();

 JavaRDD ones = jRDD.map(new Function<Row,String>() {

  public String call(Row row) throws Exception {

return row.getString(1);

  }

});

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Tue, Apr 26, 2016 at 3:48 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

> Could you maybe share your codes?
> On 26 Apr 2016 9:51 p.m., "Ramkumar V" <ramkumar.c...@gmail.com> wrote:
>
>> Hi,
>>
>> I had loaded JSON file in parquet format into SparkSQL. I can't able to
>> read List which is inside JSON.
>>
>> Sample JSON
>>
>> {
>> "TOUR" : {
>>  "CITIES" : ["Paris","Berlin","Prague"]
>> },
>> "BUDJET" : 100
>> }
>>
>> I want to read value of CITIES.
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>


Spark SQL query for List

2016-04-26 Thread Ramkumar V
Hi,

I had loaded JSON file in parquet format into SparkSQL. I can't able to
read List which is inside JSON.

Sample JSON

{
"TOUR" : {
 "CITIES" : ["Paris","Berlin","Prague"]
},
"BUDJET" : 100
}

I want to read value of CITIES.

*Thanks*,



Re: Read Parquet in Java Spark

2016-04-18 Thread Ramkumar V
HI,

Any idea on this ?

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Mon, Apr 4, 2016 at 2:47 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> I wasn't knowing you have a parquet file containing json data.
>
> Thanks
> Best Regards
>
> On Mon, Apr 4, 2016 at 2:44 PM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> Hi Akhil,
>>
>> Thanks for your help. Why do you put separator as "," ?
>>
>> I have a parquet file which contains only json in each line.
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Mon, Apr 4, 2016 at 2:34 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Something like this (in scala):
>>>
>>> val rdd = parquetFile.javaRDD().map(row => row.mkstring(","))
>>>
>>> You can create a map operation over your javaRDD to convert the
>>> org.apache.spark.sql.Row
>>> <https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/Row.html>
>>> to String (the Row.mkstring() Operation)
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Apr 4, 2016 at 12:02 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>> wrote:
>>>
>>>> Any idea on this ? How to convert parquet file into JavaRDD ?
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Thu, Mar 31, 2016 at 4:30 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks for the reply.  I tried this. It's returning JavaRDD
>>>>> instead of JavaRDD. How to get JavaRDD ?
>>>>>
>>>>> Error :
>>>>> incompatible types:
>>>>> org.apache.spark.api.java.JavaRDD cannot be
>>>>> converted to org.apache.spark.api.java.JavaRDD
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Thu, Mar 31, 2016 at 2:57 PM, UMESH CHAUDHARY <umesh9...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> From Spark Documentation:
>>>>>>
>>>>>> DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
>>>>>>
>>>>>> JavaRDD jRDD= parquetFile.javaRDD()
>>>>>>
>>>>>> javaRDD() method will convert the DF to RDD
>>>>>>
>>>>>> On Thu, Mar 31, 2016 at 2:51 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm trying to read parquet log files in Java Spark. Parquet log
>>>>>>> files are stored in hdfs. I want to read and convert that parquet file 
>>>>>>> into
>>>>>>> JavaRDD. I could able to find Sqlcontext dataframe api. How can I read 
>>>>>>> if
>>>>>>> it is sparkcontext and rdd ? what is the best way to read it ?
>>>>>>>
>>>>>>> *Thanks*,
>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Read Parquet in Java Spark

2016-04-04 Thread Ramkumar V
Hi Akhil,

Thanks for your help. Why do you put separator as "," ?

I have a parquet file which contains only json in each line.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Mon, Apr 4, 2016 at 2:34 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Something like this (in scala):
>
> val rdd = parquetFile.javaRDD().map(row => row.mkstring(","))
>
> You can create a map operation over your javaRDD to convert the
> org.apache.spark.sql.Row
> <https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/Row.html>
> to String (the Row.mkstring() Operation)
>
> Thanks
> Best Regards
>
> On Mon, Apr 4, 2016 at 12:02 PM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> Any idea on this ? How to convert parquet file into JavaRDD ?
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Thu, Mar 31, 2016 at 4:30 PM, Ramkumar V <ramkumar.c...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Thanks for the reply.  I tried this. It's returning JavaRDD instead
>>> of JavaRDD. How to get JavaRDD ?
>>>
>>> Error :
>>> incompatible types:
>>> org.apache.spark.api.java.JavaRDD cannot be
>>> converted to org.apache.spark.api.java.JavaRDD
>>>
>>>
>>>
>>>
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Thu, Mar 31, 2016 at 2:57 PM, UMESH CHAUDHARY <umesh9...@gmail.com>
>>> wrote:
>>>
>>>> From Spark Documentation:
>>>>
>>>> DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
>>>>
>>>> JavaRDD jRDD= parquetFile.javaRDD()
>>>>
>>>> javaRDD() method will convert the DF to RDD
>>>>
>>>> On Thu, Mar 31, 2016 at 2:51 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to read parquet log files in Java Spark. Parquet log files
>>>>> are stored in hdfs. I want to read and convert that parquet file into
>>>>> JavaRDD. I could able to find Sqlcontext dataframe api. How can I read if
>>>>> it is sparkcontext and rdd ? what is the best way to read it ?
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Read Parquet in Java Spark

2016-04-04 Thread Ramkumar V
Any idea on this ? How to convert parquet file into JavaRDD ?

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Thu, Mar 31, 2016 at 4:30 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote:

> Hi,
>
> Thanks for the reply.  I tried this. It's returning JavaRDD instead
> of JavaRDD. How to get JavaRDD ?
>
> Error :
> incompatible types:
> org.apache.spark.api.java.JavaRDD cannot be
> converted to org.apache.spark.api.java.JavaRDD
>
>
>
>
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Thu, Mar 31, 2016 at 2:57 PM, UMESH CHAUDHARY <umesh9...@gmail.com>
> wrote:
>
>> From Spark Documentation:
>>
>> DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
>>
>> JavaRDD jRDD= parquetFile.javaRDD()
>>
>> javaRDD() method will convert the DF to RDD
>>
>> On Thu, Mar 31, 2016 at 2:51 PM, Ramkumar V <ramkumar.c...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to read parquet log files in Java Spark. Parquet log files
>>> are stored in hdfs. I want to read and convert that parquet file into
>>> JavaRDD. I could able to find Sqlcontext dataframe api. How can I read if
>>> it is sparkcontext and rdd ? what is the best way to read it ?
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>
>


Re: Read Parquet in Java Spark

2016-03-31 Thread Ramkumar V
Hi,

Thanks for the reply.  I tried this. It's returning JavaRDD instead of
JavaRDD. How to get JavaRDD ?

Error :
incompatible types:
org.apache.spark.api.java.JavaRDD cannot be
converted to org.apache.spark.api.java.JavaRDD





*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Thu, Mar 31, 2016 at 2:57 PM, UMESH CHAUDHARY <umesh9...@gmail.com>
wrote:

> From Spark Documentation:
>
> DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
>
> JavaRDD jRDD= parquetFile.javaRDD()
>
> javaRDD() method will convert the DF to RDD
>
> On Thu, Mar 31, 2016 at 2:51 PM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm trying to read parquet log files in Java Spark. Parquet log files are
>> stored in hdfs. I want to read and convert that parquet file into JavaRDD.
>> I could able to find Sqlcontext dataframe api. How can I read if it
>> is sparkcontext and rdd ? what is the best way to read it ?
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>


Read Parquet in Java Spark

2016-03-31 Thread Ramkumar V
Hi,

I'm trying to read parquet log files in Java Spark. Parquet log files are
stored in hdfs. I want to read and convert that parquet file into JavaRDD.
I could able to find Sqlcontext dataframe api. How can I read if it
is sparkcontext and rdd ? what is the best way to read it ?

*Thanks*,



Re: Spark with MapDB

2015-12-08 Thread Ramkumar V
Im running spark batch job in cluster mode every hour and it runs for 15
minutes. I have certain unique keys in the dataset. i dont want to process
those keys during my next hour batch.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Tue, Dec 8, 2015 at 1:42 PM, Fengdong Yu <fengdo...@everstring.com>
wrote:

> Can you detail your question?  what looks like your previous batch and the
> current batch?
>
>
>
>
>
> On Dec 8, 2015, at 3:52 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote:
>
> Hi,
>
> I'm running java over spark in cluster mode. I want to apply filter on
> javaRDD based on some previous batch values. if i store those values in
> mapDB, is it possible to apply filter during the current batch ?
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
>


Re: Spark with MapDB

2015-12-08 Thread Ramkumar V
Pipe separated value. I know broadcast and join works. but i would like to
know mapDB works or not ?

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Tue, Dec 8, 2015 at 2:22 PM, Fengdong Yu <fengdo...@everstring.com>
wrote:

>
> what’s your data format? ORC or CSV or others?
>
> val keys = sqlContext.read.orc(“your previous batch data
> path”).select($”uniq_key”).collect
> val broadCast = sc.broadCast(keys)
>
> val rdd = your_current_batch_data
> rdd.filter( line => line.key  not in broadCase.value)
>
>
>
>
>
>
> On Dec 8, 2015, at 4:44 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote:
>
> Im running spark batch job in cluster mode every hour and it runs for 15
> minutes. I have certain unique keys in the dataset. i dont want to process
> those keys during my next hour batch.
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Tue, Dec 8, 2015 at 1:42 PM, Fengdong Yu <fengdo...@everstring.com>
> wrote:
>
>> Can you detail your question?  what looks like your previous batch and
>> the current batch?
>>
>>
>>
>>
>>
>> On Dec 8, 2015, at 3:52 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote:
>>
>> Hi,
>>
>> I'm running java over spark in cluster mode. I want to apply filter on
>> javaRDD based on some previous batch values. if i store those values in
>> mapDB, is it possible to apply filter during the current batch ?
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>>
>
>


Re: Spark with MapDB

2015-12-08 Thread Ramkumar V
Yes, I agree but the data is in the form of RDD and also im running it
cluster mode so the data should be distributed across all machines in the
cluster. but if i use bloom filter or mapDB which is non distributed. How
will it works in this case ?

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Tue, Dec 8, 2015 at 5:30 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> You may want to use a bloom filter for this, but make sure that you
> understand how it works
>
> On 08 Dec 2015, at 09:44, Ramkumar V <ramkumar.c...@gmail.com> wrote:
>
> Im running spark batch job in cluster mode every hour and it runs for 15
> minutes. I have certain unique keys in the dataset. i dont want to process
> those keys during my next hour batch.
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Tue, Dec 8, 2015 at 1:42 PM, Fengdong Yu <fengdo...@everstring.com>
> wrote:
>
>> Can you detail your question?  what looks like your previous batch and
>> the current batch?
>>
>>
>>
>>
>>
>> On Dec 8, 2015, at 3:52 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote:
>>
>> Hi,
>>
>> I'm running java over spark in cluster mode. I want to apply filter on
>> javaRDD based on some previous batch values. if i store those values in
>> mapDB, is it possible to apply filter during the current batch ?
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>>
>


Spark with MapDB

2015-12-07 Thread Ramkumar V
Hi,

I'm running java over spark in cluster mode. I want to apply filter on
javaRDD based on some previous batch values. if i store those values in
mapDB, is it possible to apply filter during the current batch ?

*Thanks*,



Re: Distinct on key-value pair of JavaRDD

2015-11-19 Thread Ramkumar V
I thought some specific function would be there but I'm using reducebykey now.
Its working fine. Thanks a lot.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Tue, Nov 17, 2015 at 6:21 PM, ayan guha <guha.a...@gmail.com> wrote:

> How about using reducebykey?
> On 17 Nov 2015 22:00, "Ramkumar V" <ramkumar.c...@gmail.com> wrote:
>
>> Hi,
>>
>> I have JavaRDD<String,String>. I would like to do distinct only on key
>> but the normal distinct applies on both key and value. i want to apply only
>> on key. How to do that ?
>>
>> Any help is appreciated.
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>


Distinct on key-value pair of JavaRDD

2015-11-17 Thread Ramkumar V
Hi,

I have JavaRDD. I would like to do distinct only on key but
the normal distinct applies on both key and value. i want to apply only on
key. How to do that ?

Any help is appreciated.

*Thanks*,



Re: Exception while reading from kafka stream

2015-11-03 Thread Ramkumar V
Thanks a lot , it worked for me. I'm using single direct stream which
retrieves data from all the topic.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Mon, Nov 2, 2015 at 8:13 PM, Cody Koeninger <c...@koeninger.org> wrote:

> combine topicsSet_1 and topicsSet_2 in a single createDirectStream call.
> Then you can use hasOffsetRanges to see what the topic for a given
> partition is.
>
> On Mon, Nov 2, 2015 at 7:26 AM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> if i try like below code snippet , it shows exception , how to avoid this
>> exception ? how to switch processing based on topic ?
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.seconds(30));
>> HashSet topicsSet_1 = new
>> HashSet(Arrays.asList(topics.split(",")));
>> HashSet topicsSet_2 = new
>> HashSet(Arrays.asList(topics.split(",")));
>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>> kafkaParams.put("metadata.broker.list", brokers);
>> JavaPairInputDStream<String, String> messages_1 =
>> KafkaUtils.createDirectStream(
>>jssc,
>>String.class,
>>String.class,
>>StringDecoder.class,
>>StringDecoder.class,
>>kafkaParams,
>>topicsSet_1
>>);
>>
>> JavaPairInputDStream<String, String> messages_2 =
>> KafkaUtils.createDirectStream(
>>jssc,
>>String.class,
>>String.class,
>>StringDecoder.class,
>>StringDecoder.class,
>>kafkaParams,
>> topicsSet_2
>>);
>>
>> * Log Trace* :
>>
>> [ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> [ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> [ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> 15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.io.IOException: Failed to delete
>> somedomain/user/hdfs/spark_output/kyt_req/part-00055
>> 15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception: java.io.IOException:
>> Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
>> remote=somedomain]. 59994 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain":8020;
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
>> remote=somedomain]. 59998 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain;
>> 15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.lang.NullPointerException
>> 15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception:
>> java.lang.NullPointerException)
>> 15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
>> ApplicationMaster with FAILED (diag message: User class threw exception:
>> java.lang.NullPointerException)
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
>> remote=somedomain]. 59991 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain":8020;
>> [ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception dur

Re: Exception while reading from kafka stream

2015-11-02 Thread Ramkumar V
if i try like below code snippet , it shows exception , how to avoid this
exception ? how to switch processing based on topic ?

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(30));
HashSet topicsSet_1 = new
HashSet(Arrays.asList(topics.split(",")));
HashSet topicsSet_2 = new
HashSet(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
JavaPairInputDStream<String, String> messages_1 =
KafkaUtils.createDirectStream(
   jssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
   topicsSet_1
   );

JavaPairInputDStream<String, String> messages_2 =
KafkaUtils.createDirectStream(
   jssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
topicsSet_2
   );

* Log Trace* :

[ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
[ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
[ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw exception:
java.io.IOException: Failed to delete
somedomain/user/hdfs/spark_output/kyt_req/part-00055
15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception: java.io.IOException:
Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
remote=somedomain]. 59994 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain":8020;
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
remote=somedomain]. 59998 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain;
15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.NullPointerException
15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
java.lang.NullPointerException)
15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
ApplicationMaster with FAILED (diag message: User class threw exception:
java.lang.NullPointerException)
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
remote=somedomain]. 59991 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain":8020;
[ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)



*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Just put them all in one stream and switch processing based on the topic
>
> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> i want to join all those logs in some manner. That's what i'm trying to
>> do.
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> I don't think Spark Streaming supports multiple streaming context in one
>>> jvm, you cannot use in such way. Instead you could run multiple streaming
>>> applications, since you're using Yarn.
>>>
>>> 2015年10月30日星期五,Ramkumar V <ramkumar.c...@gmail.com> 写道:
>>>
>>>> I found NPE is mainly because of im using the same JavaStreamingContext
>>>> for some other kafka stream. if i change the name , its running
>>>

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
spark version - spark 1.4.1

my code snippet:

String brokers = "ip:port,ip:port";
String topics = "x,y,z";
HashSet TopicsSet = new
HashSet(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);

JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
   jssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
TopicsSet
   );

messages.foreachRDD(new Function<JavaPairRDD,Void> () {
public Void call(JavaPairRDD tuple) {
JavaRDDrdd = tuple.values();
rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
return null;
}
   });


*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> What Spark version are you using, also a small code snippet of how you use
> Spark Streaming would be greatly helpful.
>
> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> I can able to read and print few lines. Afterthat i'm getting this
>> exception. Any idea for this ?
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ramkumar.c...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to read from kafka stream and printing it textfile. I'm using
>>> java over spark. I dont know why i'm getting the following exception.
>>> Also exception message is very abstract.  can anyone please help me ?
>>>
>>> Log Trace :
>>>
>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>>> java.lang.NullPointerException
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>> at
>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>> at
>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>> at
>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>> at
>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>> at
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>> exception: java.lang.NullPointerException
>>> java.lang.NullPointerException
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>> at
>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>> at
>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>> at
>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>> at
>>> scala.collection.AbstractTraversable.maxBy(Traversa

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
I can able to read and print few lines. Afterthat i'm getting this
exception. Any idea for this ?

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote:

> Hi,
>
> I'm trying to read from kafka stream and printing it textfile. I'm using
> java over spark. I dont know why i'm getting the following exception.
> Also exception message is very abstract.  can anyone please help me ?
>
> Log Trace :
>
> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
> at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
> at
> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
> at
> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.NullPointerException
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
> at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
> at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
> at
> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
> at
> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>


Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
No. this is the only exception that im getting multiple times in my log.
Also i was reading some other topics earlier but im not faced this NPE.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> I just did a local test with your code, seems everything is fine, the only
> difference is that I use the master branch, but I don't think it changes a
> lot in this part. Do you met any other exceptions or errors beside this
> one? Probably this is due to other exceptions that makes this system
> unstable.
>
> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> No, i dont have any special settings. if i keep only reading line in my
>> code, it's throwing NPE.
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> Do you have any special settings, from your code, I don't think it will
>>> incur NPE at that place.
>>>
>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>> wrote:
>>>
>>>> spark version - spark 1.4.1
>>>>
>>>> my code snippet:
>>>>
>>>> String brokers = "ip:port,ip:port";
>>>> String topics = "x,y,z";
>>>> HashSet TopicsSet = new
>>>> HashSet(Arrays.asList(topics.split(",")));
>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>
>>>> JavaPairInputDStream<String, String> messages =
>>>> KafkaUtils.createDirectStream(
>>>>jssc,
>>>>String.class,
>>>>String.class,
>>>>StringDecoder.class,
>>>>StringDecoder.class,
>>>>kafkaParams,
>>>> TopicsSet
>>>>);
>>>>
>>>> messages.foreachRDD(new Function<JavaPairRDD,Void> () {
>>>> public Void call(JavaPairRDD tuple) {
>>>> JavaRDDrdd = tuple.values();
>>>>
>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>> return null;
>>>> }
>>>>});
>>>>
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>> wrote:
>>>>
>>>>> What Spark version are you using, also a small code snippet of how you
>>>>> use Spark Streaming would be greatly helpful.
>>>>>
>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>>>> exception. Any idea for this ?
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>>>>> using java over spark. I dont know why i'm getting the following 
>>>>>>> exception.
>>>>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>>>>
>>>>>>> Log Trace :
>>>>>>>
>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>> generator
>>>>>>> java.lang.NullPointerException
>>>>>>> at
>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>> at
>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>> at
>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
I found NPE is mainly because of im using the same JavaStreamingContext for
some other kafka stream. if i change the name , its running successfully.
how to run multiple JavaStreamingContext in a program ?  I'm getting
following exception if i run multiple JavaStreamingContext in single file.

15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
java.lang.IllegalStateException: Only one StreamingContext may be started
in this JVM. Currently running StreamingContext was started
atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)


*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> From the code, I think this field "rememberDuration" shouldn't be null,
> it will be verified at the start, unless some place changes it's value in
> the runtime that makes it null, but I cannot image how this happened. Maybe
> you could add some logs around the place where exception happens if you
> could reproduce it.
>
> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> No. this is the only exception that im getting multiple times in my log.
>> Also i was reading some other topics earlier but im not faced this NPE.
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> I just did a local test with your code, seems everything is fine, the
>>> only difference is that I use the master branch, but I don't think it
>>> changes a lot in this part. Do you met any other exceptions or errors
>>> beside this one? Probably this is due to other exceptions that makes this
>>> system unstable.
>>>
>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>> wrote:
>>>
>>>> No, i dont have any special settings. if i keep only reading line in my
>>>> code, it's throwing NPE.
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>> wrote:
>>>>
>>>>> Do you have any special settings, from your code, I don't think it
>>>>> will incur NPE at that place.
>>>>>
>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> spark version - spark 1.4.1
>>>>>>
>>>>>> my code snippet:
>>>>>>
>>>>>> String brokers = "ip:port,ip:port";
>>>>>> String topics = "x,y,z";
>>>>>> HashSet TopicsSet = new
>>>>>> HashSet(Arrays.asList(topics.split(",")));
>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>
>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>> KafkaUtils.createDirectStream(
>>>>>>jssc,
>>>>>>String.class,
>>>>>>String.class,
>>>>>>StringDecoder.class,
>>>>>>    StringDecoder.class,
>>>>>>kafkaParams,
>>>>>> TopicsSet
>>>>>>);
>>>>>>
>>>>>> messages.foreachRDD(new Function<JavaPairRDD,Void>
>>>>>> () {
>>>>>> public Void call(JavaPairRDD tuple) {
>>>>>> JavaRDDrdd = tuple.values();
>>>>>>
>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>> return null;
>>>>>> }
>>>>>>});
>>>>>>
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> What Spark version are you using, also a small code snippet of how
>>>>>>

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
No, i dont have any special settings. if i keep only reading line in my
code, it's throwing NPE.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> Do you have any special settings, from your code, I don't think it will
> incur NPE at that place.
>
> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> spark version - spark 1.4.1
>>
>> my code snippet:
>>
>> String brokers = "ip:port,ip:port";
>> String topics = "x,y,z";
>> HashSet TopicsSet = new
>> HashSet(Arrays.asList(topics.split(",")));
>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>> kafkaParams.put("metadata.broker.list", brokers);
>>
>> JavaPairInputDStream<String, String> messages =
>> KafkaUtils.createDirectStream(
>>jssc,
>>String.class,
>>String.class,
>>StringDecoder.class,
>>StringDecoder.class,
>>kafkaParams,
>> TopicsSet
>>);
>>
>> messages.foreachRDD(new Function<JavaPairRDD,Void> () {
>> public Void call(JavaPairRDD tuple) {
>> JavaRDDrdd = tuple.values();
>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>> return null;
>> }
>>});
>>
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> What Spark version are you using, also a small code snippet of how you
>>> use Spark Streaming would be greatly helpful.
>>>
>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>> wrote:
>>>
>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>> exception. Any idea for this ?
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>>> using java over spark. I dont know why i'm getting the following 
>>>>> exception.
>>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>>
>>>>> Log Trace :
>>>>>
>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>>>>> java.lang.NullPointerException
>>>>> at
>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>> at
>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>> at
>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>> at
>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>> at
>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>> at
>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>> at
>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>> at
>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>> at
>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>> at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>> at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>> at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
In general , i need to consume five different type of logs from kafka in
spark. I have different set of topics for each log. How to start five
different stream in spark ?

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 4:40 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote:

> I found NPE is mainly because of im using the same JavaStreamingContext
> for some other kafka stream. if i change the name , its running
> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
> getting following exception if i run multiple JavaStreamingContext in
> single file.
>
> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> java.lang.IllegalStateException: Only one StreamingContext may be started
> in this JVM. Currently running StreamingContext was started
> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> From the code, I think this field "rememberDuration" shouldn't be null,
>> it will be verified at the start, unless some place changes it's value in
>> the runtime that makes it null, but I cannot image how this happened. Maybe
>> you could add some logs around the place where exception happens if you
>> could reproduce it.
>>
>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com>
>> wrote:
>>
>>> No. this is the only exception that im getting multiple times in my log.
>>> Also i was reading some other topics earlier but im not faced this NPE.
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>> wrote:
>>>
>>>> I just did a local test with your code, seems everything is fine, the
>>>> only difference is that I use the master branch, but I don't think it
>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>> beside this one? Probably this is due to other exceptions that makes this
>>>> system unstable.
>>>>
>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>> wrote:
>>>>
>>>>> No, i dont have any special settings. if i keep only reading line in
>>>>> my code, it's throwing NPE.
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Do you have any special settings, from your code, I don't think it
>>>>>> will incur NPE at that place.
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> spark version - spark 1.4.1
>>>>>>>
>>>>>>> my code snippet:
>>>>>>>
>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>> String topics = "x,y,z";
>>>>>>> HashSet TopicsSet = new
>>>>>>> HashSet(Arrays.asList(topics.split(",")));
>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>
>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>jssc,
>>>>>>>String.class,
>>>>>>>String.class,
>>>>>>>StringDecoder.class,
>>>>>>>StringDecoder.class,
>>>>>>>kafkaParams,
>>>>>>> TopicsSet
>>>>>>>);
>>>>>>>
>>>>>>> messages.foreachRDD(new Function<JavaPairRDD,Void>
>>>>>>> () {
>>>>>>> public Void call(JavaPairRDD tuple) {
>>>>>>> JavaRDDrdd = tuple.values();
>>>>>>>
>>>>

Re: Exception while reading from kafka stream

2015-10-30 Thread Ramkumar V
i want to join all those logs in some manner. That's what i'm trying to do.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> I don't think Spark Streaming supports multiple streaming context in one
> jvm, you cannot use in such way. Instead you could run multiple streaming
> applications, since you're using Yarn.
>
> 2015年10月30日星期五,Ramkumar V <ramkumar.c...@gmail.com> 写道:
>
>> I found NPE is mainly because of im using the same JavaStreamingContext
>> for some other kafka stream. if i change the name , its running
>> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
>> getting following exception if i run multiple JavaStreamingContext in
>> single file.
>>
>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception:
>> java.lang.IllegalStateException: Only one StreamingContext may be started
>> in this JVM. Currently running StreamingContext was started
>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> From the code, I think this field "rememberDuration" shouldn't be null,
>>> it will be verified at the start, unless some place changes it's value in
>>> the runtime that makes it null, but I cannot image how this happened. Maybe
>>> you could add some logs around the place where exception happens if you
>>> could reproduce it.
>>>
>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>> wrote:
>>>
>>>> No. this is the only exception that im getting multiple times in my
>>>> log. Also i was reading some other topics earlier but im not faced this 
>>>> NPE.
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>> wrote:
>>>>
>>>>> I just did a local test with your code, seems everything is fine, the
>>>>> only difference is that I use the master branch, but I don't think it
>>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>>> beside this one? Probably this is due to other exceptions that makes this
>>>>> system unstable.
>>>>>
>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> No, i dont have any special settings. if i keep only reading line in
>>>>>> my code, it's throwing NPE.
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Do you have any special settings, from your code, I don't think it
>>>>>>> will incur NPE at that place.
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> spark version - spark 1.4.1
>>>>>>>>
>>>>>>>> my code snippet:
>>>>>>>>
>>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>>> String topics = "x,y,z";
>>>>>>>> HashSet TopicsSet = new
>>>>>>>> HashSet(Arrays.asList(topics.split(",")));
>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>>
>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>jssc,
>>>>>>>>String.class,
>>>>>>>>String.class,
>>>>>>>>StringDecoder.class,
>>>>&g

Exception while reading from kafka stream

2015-10-29 Thread Ramkumar V
Hi,

I'm trying to read from kafka stream and printing it textfile. I'm using
java over spark. I dont know why i'm getting the following exception.
Also exception message is very abstract.  can anyone please help me ?

Log Trace :

15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
java.lang.NullPointerException
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
at
scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
at
org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.NullPointerException
java.lang.NullPointerException
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
at
scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
at
org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



*Thanks*,



Saving offset while reading from kafka

2015-10-22 Thread Ramkumar V
Hi,

I had written spark streaming application using kafka stream and its
writing to hdfs for every hour(batch time). I would like to know how to get
offset or commit offset of kafka stream while writing to hdfs so that if
there is any issue or redeployment, i'll start from the point where i did a
previous successful commit offset. I want to store offset in external db or
something like that, not in zookeeper. if i want to resume kafka stream
from the particular offset, how to resume from the particular offset in
spark ?

*Thanks*,



Conf setting for Java Spark

2015-10-13 Thread Ramkumar V
Hi,

I'm using java over spark for processing 30 GB of data every hour. I'm
doing spark-submit in cluster mode. I have a cluster of 11 machines (9 - 64
GB memory and 2 - 32 GB memory ) but it takes 30 mins to process 30 GB of
data every hour. How can i optimize this ? How to compute the driver and
executor memory according to machine configuration ? I'm using following
spark configuration.

 sparkConf.setMaster("yarn-cluster");
 sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
 sparkConf.set("spark.driver.memory", "2g");
 sparkConf.set("spark.executor.memory", "2g");
 sparkConf.set("spark.storage.memoryFraction", "0.5");
 sparkConf.set("spark.shuffle.memoryFraction", "0.4" );

*Thanks*,



Re: OutOfMemoryError

2015-10-09 Thread Ramkumar V
How to increase the Xmx of the workers ?

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Mon, Oct 5, 2015 at 3:48 PM, Ramkumar V <ramkumar.c...@gmail.com> wrote:

> No. I didn't try to increase xmx.
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Mon, Oct 5, 2015 at 1:36 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
>> Hi Ramkumar,
>>
>> did you try to increase Xmx of the workers ?
>>
>> Regards
>> JB
>>
>> On 10/05/2015 08:56 AM, Ramkumar V wrote:
>>
>>> Hi,
>>>
>>> When i submit java spark job in cluster mode, i'm getting following
>>> exception.
>>>
>>> *LOG TRACE :*
>>>
>>> INFO yarn.ExecutorRunnable: Setting up executor with commands:
>>> List({{JAVA_HOME}}/bin/java, -server, -XX:OnOutOfMemoryError='kill
>>>   %p', -Xms1024m, -Xmx1024m, -Djava.io.tmpdir={{PWD}}/tmp,
>>> '-Dspark.ui.port=0', '-Dspark.driver.port=48309',
>>> -Dspark.yarn.app.container.log.dir=>> _DIR>, org.apache.spark.executor.CoarseGrainedExecutorBackend,
>>> --driver-url, akka.tcp://sparkDriver@ip
>>> :port/user/CoarseGrainedScheduler,
>>>   --executor-id, 2, --hostname, hostname , --cores, 1, --app-id,
>>> application_1441965028669_9009, --user-class-path, file:$PWD
>>> /__app__.jar, --user-class-path, file:$PWD/json-20090211.jar, 1>,
>>> /stdout, 2>, /stderr).
>>>
>>> I have a cluster of 11 machines (9 - 64 GB memory and 2 - 32 GB memory
>>> ). my input data of size 128 GB.
>>>
>>> How to solve this exception ? is it depends on driver.memory and
>>> execuitor.memory setting ?
>>>
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


OutOfMemoryError

2015-10-05 Thread Ramkumar V
Hi,

When i submit java spark job in cluster mode, i'm getting following
exception.

*LOG TRACE :*

INFO yarn.ExecutorRunnable: Setting up executor with commands:
List({{JAVA_HOME}}/bin/java, -server, -XX:OnOutOfMemoryError='kill
 %p', -Xms1024m, -Xmx1024m, -Djava.io.tmpdir={{PWD}}/tmp,
'-Dspark.ui.port=0', '-Dspark.driver.port=48309',
-Dspark.yarn.app.container.log.dir=, org.apache.spark.executor.CoarseGrainedExecutorBackend,
--driver-url, akka.tcp://sparkDriver@ip:port/user/CoarseGrainedScheduler,
 --executor-id, 2, --hostname, hostname , --cores, 1, --app-id,
application_1441965028669_9009, --user-class-path, file:$PWD
/__app__.jar, --user-class-path, file:$PWD/json-20090211.jar, 1>,
/stdout, 2>, /stderr).

I have a cluster of 11 machines (9 - 64 GB memory and 2 - 32 GB memory ).
my input data of size 128 GB.

How to solve this exception ? is it depends on driver.memory and
execuitor.memory setting ?


*Thanks*,



Re: OutOfMemoryError

2015-10-05 Thread Ramkumar V
No. I didn't try to increase xmx.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Mon, Oct 5, 2015 at 1:36 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Ramkumar,
>
> did you try to increase Xmx of the workers ?
>
> Regards
> JB
>
> On 10/05/2015 08:56 AM, Ramkumar V wrote:
>
>> Hi,
>>
>> When i submit java spark job in cluster mode, i'm getting following
>> exception.
>>
>> *LOG TRACE :*
>>
>> INFO yarn.ExecutorRunnable: Setting up executor with commands:
>> List({{JAVA_HOME}}/bin/java, -server, -XX:OnOutOfMemoryError='kill
>>   %p', -Xms1024m, -Xmx1024m, -Djava.io.tmpdir={{PWD}}/tmp,
>> '-Dspark.ui.port=0', '-Dspark.driver.port=48309',
>> -Dspark.yarn.app.container.log.dir=> _DIR>, org.apache.spark.executor.CoarseGrainedExecutorBackend,
>> --driver-url, akka.tcp://sparkDriver@ip:port/user/CoarseGrainedScheduler,
>>   --executor-id, 2, --hostname, hostname , --cores, 1, --app-id,
>> application_1441965028669_9009, --user-class-path, file:$PWD
>> /__app__.jar, --user-class-path, file:$PWD/json-20090211.jar, 1>,
>> /stdout, 2>, /stderr).
>>
>> I have a cluster of 11 machines (9 - 64 GB memory and 2 - 32 GB memory
>> ). my input data of size 128 GB.
>>
>> How to solve this exception ? is it depends on driver.memory and
>> execuitor.memory setting ?
>>
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Combine key-value pair in spark java

2015-09-30 Thread Ramkumar V
Hi,

I have key value pair of JavaRDD (JavaPairRDD rdd) but i
want to concatenate into one RDD String (JavaRDD result ).

How can i do that ? What i have to use (map,flatmap)? can anyone please
give me the syntax for this in java ?

*Thanks*,



Re: Combine key-value pair in spark java

2015-09-30 Thread Ramkumar V
Thanks man. Its works for me.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Wed, Sep 30, 2015 at 4:31 PM, Andy Dang <nam...@gmail.com> wrote:

> You should be able to use a simple mapping:
>
> rdd.map(tuple -> tuple._1() + tuple._2())
>
> ---
> Regards,
> Andy (Nam) Dang
>
> On Wed, Sep 30, 2015 at 10:34 AM, Ramkumar V <ramkumar.c...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have key value pair of JavaRDD (JavaPairRDD<String, String> rdd) but i
>> want to concatenate into one RDD String (JavaRDD result ).
>>
>> How can i do that ? What i have to use (map,flatmap)? can anyone please
>> give me the syntax for this in java ?
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>


GroupBy Java objects in Java Spark

2015-09-24 Thread Ramkumar V
Hi,

I want to know whether grouping by java class objects is possible or not in
java Spark.

I have Tuple2< JavaObject, JavaObject>. i want to groupbyKey and then i'll
do some operations in values after grouping.


*Thanks*,



Re: Spark Streaming failing on YARN Cluster

2015-08-25 Thread Ramkumar V
yes , when i see my yarn logs for that particular failed app_id, i got the
following error.

ERROR yarn.ApplicationMaster: SparkContext did not initialize after waiting
for 10 ms. Please check earlier log output for errors. Failing the
application

For this error, I need to change the 'SparkContext', set the Master on yarn
cluster ( SetMaster(yarn-cluster) ). Its working fine in cluster mode.
Thanks for everyone.

*Thanks*,
https://in.linkedin.com/in/ramkumarcs31


On Fri, Aug 21, 2015 at 6:41 AM, Jeff Zhang zjf...@gmail.com wrote:

 AM fails to launch, could you check the yarn app logs ? You can use
 command yarn logs -your_app_id to get the yarn app logs.



 On Thu, Aug 20, 2015 at 1:15 AM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 I'm getting some spark exception. Please look this log trace ( 
 *http://pastebin.com/xL9jaRUa
 http://pastebin.com/xL9jaRUa* ).

 *Thanks*,
 https://in.linkedin.com/in/ramkumarcs31


 On Wed, Aug 19, 2015 at 10:20 PM, Hari Shreedharan 
 hshreedha...@cloudera.com wrote:

 It looks like you are having issues with the files getting distributed
 to the cluster. What is the exception you are getting now?


 On Wednesday, August 19, 2015, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 Thanks a lot for your suggestion. I had modified HADOOP_CONF_DIR in
 spark-env.sh so that core-site.xml is under HADOOP_CONF_DIR. i can
 able to see the logs like that you had shown above. Now i can able to run
 for 3 minutes and store results between every minutes. After sometimes,
 there is an exception. How to fix this exception ? and Can you please
 explain where its going wrong ?

 *Log Link : http://pastebin.com/xL9jaRUa
 http://pastebin.com/xL9jaRUa *


 *Thanks*,
 https://in.linkedin.com/in/ramkumarcs31


 On Wed, Aug 19, 2015 at 1:54 PM, Jeff Zhang zjf...@gmail.com wrote:

 HADOOP_CONF_DIR is the environment variable point to the hadoop conf
 directory.  Not sure how CDH organize that, make sure core-site.xml is
 under HADOOP_CONF_DIR.

 On Wed, Aug 19, 2015 at 4:06 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 We are using Cloudera-5.3.1. since it is one of the earlier version
 of CDH, it doesnt supports the latest version of spark. So i installed
 spark-1.4.1 separately in my machine. I couldnt able to do spark-submit 
 in
 cluster mode. How to core-site.xml under classpath ? it will be very
 helpful if you could explain in detail to solve this issue.

 *Thanks*,
 https://in.linkedin.com/in/ramkumarcs31


 On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote:


1. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying

 file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar
2. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying

 file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar
3. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip
4. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip
5. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py
6.


1. diagnostics: Application application_1437639737006_3808
failed 2 times due to AM Container for 
 appattempt_1437639737006_3808_02
exited with  exitCode: -1000 due to: File
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist
2. .Failing this attempt.. Failing the application.



 The machine you run spark is the client machine, while the yarn AM
 is running on another machine. And the yarn AM complains that the files 
 are
 not found as your logs shown.
 From the logs, its seems that these files are not copied to the HDFS
 as local resources. I doubt that you didn't put core-site.xml under your
 classpath, so that spark can not detect your remote file system and 
 won't
 copy the files to hdfs as local resources. Usually in yarn-cluster mode,
 you should be able to see the logs like following.

  15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM
 container
  15/08/14 10:48:49 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
 - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/spark.py - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs

Re: Spark Streaming failing on YARN Cluster

2015-08-19 Thread Ramkumar V
I'm getting some spark exception. Please look this log trace (
*http://pastebin.com/xL9jaRUa
http://pastebin.com/xL9jaRUa* ).

*Thanks*,
https://in.linkedin.com/in/ramkumarcs31


On Wed, Aug 19, 2015 at 10:20 PM, Hari Shreedharan 
hshreedha...@cloudera.com wrote:

 It looks like you are having issues with the files getting distributed to
 the cluster. What is the exception you are getting now?


 On Wednesday, August 19, 2015, Ramkumar V ramkumar.c...@gmail.com wrote:

 Thanks a lot for your suggestion. I had modified HADOOP_CONF_DIR in
 spark-env.sh so that core-site.xml is under HADOOP_CONF_DIR. i can able
 to see the logs like that you had shown above. Now i can able to run for 3
 minutes and store results between every minutes. After sometimes, there is
 an exception. How to fix this exception ? and Can you please explain where
 its going wrong ?

 *Log Link : http://pastebin.com/xL9jaRUa http://pastebin.com/xL9jaRUa *


 *Thanks*,
 https://in.linkedin.com/in/ramkumarcs31


 On Wed, Aug 19, 2015 at 1:54 PM, Jeff Zhang zjf...@gmail.com wrote:

 HADOOP_CONF_DIR is the environment variable point to the hadoop conf
 directory.  Not sure how CDH organize that, make sure core-site.xml is
 under HADOOP_CONF_DIR.

 On Wed, Aug 19, 2015 at 4:06 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 We are using Cloudera-5.3.1. since it is one of the earlier version of
 CDH, it doesnt supports the latest version of spark. So i installed
 spark-1.4.1 separately in my machine. I couldnt able to do spark-submit in
 cluster mode. How to core-site.xml under classpath ? it will be very
 helpful if you could explain in detail to solve this issue.

 *Thanks*,
 https://in.linkedin.com/in/ramkumarcs31


 On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote:


1. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying

 file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar
2. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying

 file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar
3. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip
4. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip
5. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py
6.


1. diagnostics: Application application_1437639737006_3808 failed
2 times due to AM Container for appattempt_1437639737006_3808_02 
 exited
with  exitCode: -1000 due to: File
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist
2. .Failing this attempt.. Failing the application.



 The machine you run spark is the client machine, while the yarn AM is
 running on another machine. And the yarn AM complains that the files are
 not found as your logs shown.
 From the logs, its seems that these files are not copied to the HDFS
 as local resources. I doubt that you didn't put core-site.xml under your
 classpath, so that spark can not detect your remote file system and won't
 copy the files to hdfs as local resources. Usually in yarn-cluster mode,
 you should be able to see the logs like following.

  15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM
 container
  15/08/14 10:48:49 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
 - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/spark.py - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/pyspark.zip

 On Thu, Aug 13, 2015 at 2:50 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 Hi,

 I have a cluster of 1 master and 2 slaves. I'm running a spark
 streaming in master and I want to utilize all nodes in my cluster. i had
 specified some parameters like driver memory and executor memory in my
 code. when i give --deploy-mode cluster --master yarn-cluster in my
 spark-submit, it gives the following error.

 Log link : *http://pastebin.com/kfyVWDGR
 http://pastebin.com/kfyVWDGR*

 How to fix this issue ? Please help me if i'm doing wrong.


 *Thanks*,
 Ramkumar V




 --
 Best Regards

 Jeff Zhang





 --
 Best Regards

 Jeff Zhang




 --

 Thanks,
 Hari




Re: Spark Streaming failing on YARN Cluster

2015-08-19 Thread Ramkumar V
Thanks a lot for your suggestion. I had modified HADOOP_CONF_DIR in
spark-env.sh so that core-site.xml is under HADOOP_CONF_DIR. i can able to
see the logs like that you had shown above. Now i can able to run for 3
minutes and store results between every minutes. After sometimes, there is
an exception. How to fix this exception ? and Can you please explain where
its going wrong ?

*Log Link : http://pastebin.com/xL9jaRUa http://pastebin.com/xL9jaRUa *


*Thanks*,
https://in.linkedin.com/in/ramkumarcs31


On Wed, Aug 19, 2015 at 1:54 PM, Jeff Zhang zjf...@gmail.com wrote:

 HADOOP_CONF_DIR is the environment variable point to the hadoop conf
 directory.  Not sure how CDH organize that, make sure core-site.xml is
 under HADOOP_CONF_DIR.

 On Wed, Aug 19, 2015 at 4:06 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 We are using Cloudera-5.3.1. since it is one of the earlier version of
 CDH, it doesnt supports the latest version of spark. So i installed
 spark-1.4.1 separately in my machine. I couldnt able to do spark-submit in
 cluster mode. How to core-site.xml under classpath ? it will be very
 helpful if you could explain in detail to solve this issue.

 *Thanks*,
 https://in.linkedin.com/in/ramkumarcs31


 On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote:


1. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying

 file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar
2. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying

 file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar
3. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip
4. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip
5. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py
6.


1. diagnostics: Application application_1437639737006_3808 failed 2
times due to AM Container for appattempt_1437639737006_3808_02 exited
with  exitCode: -1000 due to: File
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist
2. .Failing this attempt.. Failing the application.



 The machine you run spark is the client machine, while the yarn AM is
 running on another machine. And the yarn AM complains that the files are
 not found as your logs shown.
 From the logs, its seems that these files are not copied to the HDFS as
 local resources. I doubt that you didn't put core-site.xml under your
 classpath, so that spark can not detect your remote file system and won't
 copy the files to hdfs as local resources. Usually in yarn-cluster mode,
 you should be able to see the logs like following.

  15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM
 container
  15/08/14 10:48:49 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
 - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/spark.py - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/pyspark.zip

 On Thu, Aug 13, 2015 at 2:50 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 Hi,

 I have a cluster of 1 master and 2 slaves. I'm running a spark
 streaming in master and I want to utilize all nodes in my cluster. i had
 specified some parameters like driver memory and executor memory in my
 code. when i give --deploy-mode cluster --master yarn-cluster in my
 spark-submit, it gives the following error.

 Log link : *http://pastebin.com/kfyVWDGR
 http://pastebin.com/kfyVWDGR*

 How to fix this issue ? Please help me if i'm doing wrong.


 *Thanks*,
 Ramkumar V




 --
 Best Regards

 Jeff Zhang





 --
 Best Regards

 Jeff Zhang



Re: Spark Streaming failing on YARN Cluster

2015-08-19 Thread Ramkumar V
We are using Cloudera-5.3.1. since it is one of the earlier version of CDH,
it doesnt supports the latest version of spark. So i installed spark-1.4.1
separately in my machine. I couldnt able to do spark-submit in cluster
mode. How to core-site.xml under classpath ? it will be very helpful if you
could explain in detail to solve this issue.

*Thanks*,
https://in.linkedin.com/in/ramkumarcs31


On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote:


1. 15/08/12 13:24:49 INFO Client: Source and destination file systems
are the same. Not copying

 file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar
2. 15/08/12 13:24:49 INFO Client: Source and destination file systems
are the same. Not copying

 file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar
3. 15/08/12 13:24:49 INFO Client: Source and destination file systems
are the same. Not copying 
 file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip
4. 15/08/12 13:24:49 INFO Client: Source and destination file systems
are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip
5. 15/08/12 13:24:49 INFO Client: Source and destination file systems
are the same. Not copying
file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py
6.


1. diagnostics: Application application_1437639737006_3808 failed 2
times due to AM Container for appattempt_1437639737006_3808_02 exited
with  exitCode: -1000 due to: File
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist
2. .Failing this attempt.. Failing the application.



 The machine you run spark is the client machine, while the yarn AM is
 running on another machine. And the yarn AM complains that the files are
 not found as your logs shown.
 From the logs, its seems that these files are not copied to the HDFS as
 local resources. I doubt that you didn't put core-site.xml under your
 classpath, so that spark can not detect your remote file system and won't
 copy the files to hdfs as local resources. Usually in yarn-cluster mode,
 you should be able to see the logs like following.

  15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM
 container
  15/08/14 10:48:49 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
 - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/spark.py - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/pyspark.zip

 On Thu, Aug 13, 2015 at 2:50 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 Hi,

 I have a cluster of 1 master and 2 slaves. I'm running a spark streaming
 in master and I want to utilize all nodes in my cluster. i had specified
 some parameters like driver memory and executor memory in my code. when i
 give --deploy-mode cluster --master yarn-cluster in my spark-submit, it
 gives the following error.

 Log link : *http://pastebin.com/kfyVWDGR http://pastebin.com/kfyVWDGR*

 How to fix this issue ? Please help me if i'm doing wrong.


 *Thanks*,
 Ramkumar V




 --
 Best Regards

 Jeff Zhang



Spark Streaming failing on YARN Cluster

2015-08-13 Thread Ramkumar V
Hi,

I have a cluster of 1 master and 2 slaves. I'm running a spark streaming in
master and I want to utilize all nodes in my cluster. i had specified some
parameters like driver memory and executor memory in my code. when i
give --deploy-mode cluster --master yarn-cluster in my spark-submit, it
gives the following error.

Log link : *http://pastebin.com/kfyVWDGR http://pastebin.com/kfyVWDGR*

How to fix this issue ? Please help me if i'm doing wrong.


*Thanks*,
Ramkumar V


Re: Spark Streaming failing on YARN Cluster

2015-08-13 Thread Ramkumar V
Yes. this file is available in this path in the same machine where i'm
running the spark. later i moved spark-1.4.1 folder to all other machines
in my cluster but still i'm facing the same issue.


*Thanks*,
https://in.linkedin.com/in/ramkumarcs31


On Thu, Aug 13, 2015 at 1:17 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Just make sure this file is available:

 appattempt_1437639737006_3808_02 exited with  exitCode: -1000 due to:
 File *file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip* does not exist

 Thanks
 Best Regards

 On Thu, Aug 13, 2015 at 12:20 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 Hi,

 I have a cluster of 1 master and 2 slaves. I'm running a spark streaming
 in master and I want to utilize all nodes in my cluster. i had specified
 some parameters like driver memory and executor memory in my code. when i
 give --deploy-mode cluster --master yarn-cluster in my spark-submit, it
 gives the following error.

 Log link : *http://pastebin.com/kfyVWDGR http://pastebin.com/kfyVWDGR*

 How to fix this issue ? Please help me if i'm doing wrong.


 *Thanks*,
 Ramkumar V