Re: how to use lit() in spark-java

2018-03-23 Thread Anil Langote
You have import functions

dataset.withColumn(columnName,functions.lit("constant"))

Thank you
Anil Langote

Sent from my iPhone
_
From: 崔苗 <cuim...@danale.com>
Sent: Friday, March 23, 2018 8:33 AM
Subject: how to use lit() in spark-java
To: <user@spark.apache.org>


Hi Guys,

I want to add a constant column to dataset by lit function in java, like that:
 dataset.withColumn(columnName,lit("constant"))
but it's seems that idea coundn't found the lit() function,so how to use lit() 
function in java?

thanks for any reply






Re: Spark Inner Join on pivoted datasets results empty dataset

2017-10-19 Thread Anil Langote
Is there any limit on number of columns used in inner join ?

Thank you
Anil Langote

Sent from my iPhone
_
From: Anil Langote <anillangote0...@gmail.com<mailto:anillangote0...@gmail.com>>
Sent: Thursday, October 19, 2017 5:01 PM
Subject: Spark Inner Join on pivoted datasets results empty dataset
To: user <user@spark.apache.org<mailto:user@spark.apache.org>>


Hi All,

I have a requirement to pivot multiple columns using single columns, the pivot 
API doesn't support doing that hence I have been doing pivot for two columns 
and then trying to merge the dataset the result is producing empty dataset. 
Below is the sudo code

Main dataset => 33 columns (30 columns are string and 2 columns are of type 
double array lets say vector1 and vector2, 1 column Decider which has 0 & 1 
values)

String grouByColumns =  "col1,col2,col3,col4,col5,col6...col30";
Vector columns : Vector1 and Vector2

i do pivot like below

List< Object > values = new ArrayList();
values.add("0");
values.add("1")

Dataset pivot1 = 
mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector1"));
pivot1 = pivot1.withColumRenamed("0","Vector1_0");
pivot1 = pivot1.withColumRenamed("1","Vector1_1");

Count on pivot1 = 12856

Dataset pivot2 = 
mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector2"));
pivot2 = pivot2.withColumRenamed("0","Vector2_0");
pivot2 = pivot2.withColumRenamed("1","Vector2_1");

Count on pivot2 = 12856

Dataset finalDataset = pivot1.join(pivot2,Seq);

Count on pivot1 = 0 ? Why this sould be 12856  right?

The same code works on local with less columns and 100 records.

Is there anything i am missing here is there any better way to pivot the 
multiple columns i can not do combine because my aggregation columns are array 
of doubles.

The pivot1 & pivot2 dataset derived by same parent dataset the group by columns 
are same all i am doing is inner join on these two dataset with same group by 
columns why it doesn't work?

Thank you
Anil Langote




Spark Inner Join on pivoted datasets results empty dataset

2017-10-19 Thread Anil Langote
Hi All,

I have a requirement to pivot multiple columns using single columns, the
pivot API doesn't support doing that hence I have been doing pivot for two
columns and then trying to merge the dataset the result is producing empty
dataset. Below is the sudo code

Main dataset => 33 columns (30 columns are string and 2 columns are of type
double array lets say vector1 and vector2, 1 column Decider which has 0 & 1
values)

String grouByColumns =  "col1,col2,col3,col4,col5,col6...col30";
Vector columns : Vector1 and Vector2

i do pivot like below

List< Object > values = new ArrayList();
values.add("0");
values.add("1")

Dataset pivot1 =
mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector1"));
pivot1 = pivot1.withColumRenamed("0","Vector1_0");
pivot1 = pivot1.withColumRenamed("1","Vector1_1");

*Count on pivot1* = 12856

Dataset pivot2 =
mainDataset.grouBy(grouByColumns).pivot("Decider",values).agg(functions.callUDF(CUSTOM_UDAF,mainDataset.col("Vector2"));
pivot2 = pivot2.withColumRenamed("0","Vector2_0");
pivot2 = pivot2.withColumRenamed("1","Vector2_1");

*Count on pivot2* = 12856

Dataset finalDataset = pivot1.join(pivot2,Seq);

*Count on pivot1 *= 0 ? Why this sould be 12856  right?

The same code works on local with less columns and 100 records.

Is there anything i am missing here is there any better way to pivot the
multiple columns i can not do combine because my aggregation columns are
array of doubles.

The pivot1 & pivot2 dataset derived by same parent dataset the group by
columns are same all i am doing is inner join on these two dataset with
same group by columns why it doesn't work?

Thank you
Anil Langote


Issue with caching

2017-01-27 Thread Anil Langote
Hi All 

I am trying to cache large dataset with storage level memory and sterilization 
with kyro enabled when I run my spark job multiple times I get different 
performance at a times caching dataset spark hangs and takes forever what is 
wrong.

The best time I got is 20 mins and some times with same configuration it takes 
40 mins why this is happening ?

Best Regards,
Anil Langote
+1-425-633-9747

Re: Efficient look up in Key Pair RDD

2017-01-08 Thread Anil Langote
Hi Ayan

 

Thanks a lot for reply, what is GROUPING SET? I did try GROUP BY with UDAF but 
it doesn’t perform well. for one combination it takes 1.5 mins in my use case I 
have 400 combinations which will take ~400 mins I am looking for a solution 
which will scale on the combinations.

 

Thank you

Anil Langote

+1-425-633-9747

 

 

From: ayan guha <guha.a...@gmail.com>
Date: Sunday, January 8, 2017 at 10:26 PM
To: Anil Langote <anillangote0...@gmail.com>
Cc: Holden Karau <hol...@pigscanfly.ca>, user <user@spark.apache.org>
Subject: Re: Efficient look up in Key Pair RDD

 

Have you tried something like GROUPING SET? That seems to be the exact thing 
you are looking for

 

On Mon, Jan 9, 2017 at 12:37 PM, Anil Langote <anillangote0...@gmail.com> wrote:

Sure. Let me explain you my requirement I have an input file which has 
attributes (25) and las column is array of doubles (14500 elements in original 
file)

 

Attribute_0Attribute_1Attribute_2Attribute_3DoubleArray
53530.2938933463658645  0.0437040427073041  0.23002681025029648  
0.18003221216680454
32130.5353599620508771  0.026777650111232787  0.31473082754161674  
0.2647786522276575
53520.8803063581705307  0.8101324740101096  0.48523937757683544  
0.5897714618376072
32130.33960064683141955  0.46537001358164043  0.543428826489435  
0.42653939565053034
22050.5108235777360906  0.4368119043922922  0.8651556676944931  
0.7451477943975504
 

Now I have to compute the addition of the double for any given combination for 
example in above file we will have below possible combinations

 

1.  Attribute_0, Attribute_1

2.  Attribute_0, Attribute_2

3.  Attribute_0, Attribute_3

4.  Attribute_1, Attribute_2

5.  Attribute_2, Attribute_3

6.  Attribute_1, Attribute_3

7.  Attribute_0, Attribute_1, Attribute_2

8.  Attribute_0, Attribute_1, Attribute_3

9.  Attribute_0, Attribute_2, Attribute_3

10.  Attribute_1, Attribute_2, Attribute_3

11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4

 

now if we process the Attribute_0, Attribute_1 combination we want below 
output. In similar way we have to process all the above combinations

 

5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319, 
0.7698036740044117]

3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518, 
0.6913180478781878]

 

Solution tried

 

I have created parequet file which will have the schema and last column will be 
array of doubles. The size of the parquet file I have is 276G which has 2.65 M 
records.

 

I have implemented the UDAF which will have 

 

Input schema : array of doubles

Buffer schema : array of doubles 

Return schema : array of doubles

 

I load the data from parquet file and then register the UDAF to use with below 
query, note that SUM is UDAF

 

SELECT COUNT(*) AS MATCHES, SUM(DOUBLEARRAY), Attribute_0, Attribute_1 FROM 
RAW_TABLE GROUP BY Attribute_0, Attribute_1 HAVING COUNT(*)>1

 

This works fine and it takes 1.2 mins for one combination my use case will have 
400 combinations which means 8 hours which is not meeting the SLA we want this 
to be below 1 hours. What is the best way to implement this use case.

 

Best Regards,

Anil Langote

+1-425-633-9747


On Jan 8, 2017, at 8:17 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

To start with caching and having a known partioner will help a bit, then there 
is also the IndexedRDD project, but in general spark might not be the best tool 
for the job.  Have you considered having Spark output to something like 
memcache?

 

What's the goal of you are trying to accomplish?

 

On Sun, Jan 8, 2017 at 5:04 PM Anil Langote <anillangote0...@gmail.com> wrote:

Hi All,

 

I have a requirement where I wanted to build a distributed HashMap which holds 
10M key value pairs and provides very efficient lookups for each key. I tried 
loading the file into JavaPairedRDD and tried calling lookup method its very 
slow.

 

How can I achieve very very faster lookup by a given key?

 

Thank you

Anil Langote 



 

-- 

Best Regards,
Ayan Guha



Re: Efficient look up in Key Pair RDD

2017-01-08 Thread Anil Langote
Sure. Let me explain you my requirement I have an input file which has 
attributes (25) and las column is array of doubles (14500 elements in original 
file)
 
Attribute_0
Attribute_1
Attribute_2
Attribute_3
DoubleArray
5
3
5
3
0.2938933463658645  0.0437040427073041  0.23002681025029648  0.18003221216680454
3
2
1
3
0.5353599620508771  0.026777650111232787  0.31473082754161674  
0.2647786522276575
5
3
5
2
0.8803063581705307  0.8101324740101096  0.48523937757683544  0.5897714618376072
3
2
1
3
0.33960064683141955  0.46537001358164043  0.543428826489435  0.42653939565053034
2
2
0
5
0.5108235777360906  0.4368119043922922  0.8651556676944931  0.7451477943975504
 
Now I have to compute the addition of the double for any given combination for 
example in above file we will have below possible combinations
 
1.  Attribute_0, Attribute_1
2.  Attribute_0, Attribute_2
3.  Attribute_0, Attribute_3
4.  Attribute_1, Attribute_2
5.  Attribute_2, Attribute_3
6.  Attribute_1, Attribute_3
7.  Attribute_0, Attribute_1, Attribute_2
8.  Attribute_0, Attribute_1, Attribute_3
9.  Attribute_0, Attribute_2, Attribute_3
10.  Attribute_1, Attribute_2, Attribute_3
11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4
 
now if we process the Attribute_0, Attribute_1 combination we want below 
output. In similar way we have to process all the above combinations
 
5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319, 
0.7698036740044117]
3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518, 
0.6913180478781878]
 
Solution tried
 
I have created parequet file which will have the schema and last column will be 
array of doubles. The size of the parquet file I have is 276G which has 2.65 M 
records.
 
I have implemented the UDAF which will have 
 
Input schema : array of doubles
Buffer schema : array of doubles 
Return schema : array of doubles
 
I load the data from parquet file and then register the UDAF to use with below 
query, note that SUM is UDAF
 
SELECT COUNT(*) AS MATCHES, SUM(DOUBLEARRAY), Attribute_0, Attribute_1 FROM 
RAW_TABLE GROUP BY Attribute_0, Attribute_1 HAVING COUNT(*)>1
 
This works fine and it takes 1.2 mins for one combination my use case will have 
400 combinations which means 8 hours which is not meeting the SLA we want this 
to be below 1 hours. What is the best way to implement this use case.

Best Regards,
Anil Langote
+1-425-633-9747

> On Jan 8, 2017, at 8:17 PM, Holden Karau <hol...@pigscanfly.ca> wrote:
> 
> To start with caching and having a known partioner will help a bit, then 
> there is also the IndexedRDD project, but in general spark might not be the 
> best tool for the job.  Have you considered having Spark output to something 
> like memcache?
> 
> What's the goal of you are trying to accomplish?
> 
>> On Sun, Jan 8, 2017 at 5:04 PM Anil Langote <anillangote0...@gmail.com> 
>> wrote:
>> Hi All,
>> 
>> I have a requirement where I wanted to build a distributed HashMap which 
>> holds 10M key value pairs and provides very efficient lookups for each key. 
>> I tried loading the file into JavaPairedRDD and tried calling lookup method 
>> its very slow.
>> 
>> How can I achieve very very faster lookup by a given key?
>> 
>> Thank you
>> Anil Langote 


Efficient look up in Key Pair RDD

2017-01-08 Thread Anil Langote
Hi All,

I have a requirement where I wanted to build a distributed HashMap which
holds 10M key value pairs and provides very efficient lookups for each key.
I tried loading the file into JavaPairedRDD and tried calling lookup method
its very slow.

How can I achieve very very faster lookup by a given key?

Thank you
Anil Langote


Spark Aggregator for array of doubles

2017-01-04 Thread Anil Langote
Hi All,

I have been working on a use case where I have a DF which has 25 columns, 24 
columns are of type string and last column is array of doubles. For a given set 
of columns I have to apply group by and add the array of doubles, I have 
implemented UDAF which works fine but it's expensive in order to tune the 
solution I came across Aggregators which can be implemented and used with agg 
function, my question is how can we implement a aggregator which takes array of 
doubles as input and returns the array of double.

I learned that it's not possible to implement the aggregator in Java can be 
done in scala only how can define the aggregator which takes array of doubles 
as input, note that I have parquet file as my input.

Any pointers are highly appreciated, I read that spark UDAF is slow and 
aggregators are the way to go.

Best Regards,
Anil Langote
+1-425-633-9747

Re: Parquet with group by queries

2016-12-21 Thread Anil Langote
I tried caching the parent data set but it slows down the execution time, last 
column in the input data set is double array and requirement is to add last 
column double array after doing group by. I have implemented an aggregation 
function which adds the last column. Hence the query is 

Select count(*), col1, col2, col3, aggregationFunction(doublecol) from table 
group by col1,col2,col3 having count(*) >1

The about queries group by columns will change similarly I have to run 100 
queries on same data set.

Best Regards,
Anil Langote
+1-425-633-9747

> On Dec 21, 2016, at 11:41 AM, Anil Langote <anillangote0...@gmail.com> wrote:
> 
> Hi All,
> 
> I have an requirement where I have to run 100 group by queries with different 
> columns I have generated the parquet file which has 30 columns I see every 
> parquet files has different size and 200 files are generated, my question is 
> what is the best approach to run group by queries on parquet files more files 
> are recommend or I should create less files to get better performance.  
> 
> Right now with 2 cores and 65 executors on 4 node cluster with 320 cores 
> available spark is taking average 1.4 mins to finish one query we want to 
> tune the time around 30 or 40 seconds for one query the hdfs block size 128MB 
> and spark is launching 2400 tasks the partitions for the input dataset is 
> 2252.
> 
> I have implemented the threading in spark driver to launch all these queries 
> at the same time with fair scheduled enabled however I see most of times jobs 
> are running sequentially.
> 
> Any input in this regard is appreciated.
> 
> Best Regards,
> Anil Langote
> +1-425-633-9747


Parquet with group by queries

2016-12-21 Thread Anil Langote
Hi All,

I have an requirement where I have to run 100 group by queries with different 
columns I have generated the parquet file which has 30 columns I see every 
parquet files has different size and 200 files are generated, my question is 
what is the best approach to run group by queries on parquet files more files 
are recommend or I should create less files to get better performance.  

Right now with 2 cores and 65 executors on 4 node cluster with 320 cores 
available spark is taking average 1.4 mins to finish one query we want to tune 
the time around 30 or 40 seconds for one query the hdfs block size 128MB and 
spark is launching 2400 tasks the partitions for the input dataset is 2252.

I have implemented the threading in spark driver to launch all these queries at 
the same time with fair scheduled enabled however I see most of times jobs are 
running sequentially.

Any input in this regard is appreciated.

Best Regards,
Anil Langote
+1-425-633-9747

Re: DataSet is not able to handle 50,000 columns to sum

2016-11-11 Thread Anil Langote
All right thanks for inputs is there any way spark can process all combination 
parallel in one job ? 

If is it ok to load the input csv file in dataframe and use flat map to create 
key pair, then use reduceByKey to sum the double array? I believe that will 
work same like agg function which you are suggesting.

Best Regards,
Anil Langote
+1-425-633-9747

> On Nov 11, 2016, at 7:10 PM, ayan guha <guha.a...@gmail.com> wrote:
> 
> You can explore grouping sets in SQL and write an aggregate function to add 
> array wise sum.
> 
> It will boil down to something like
> 
> Select attr1,attr2...,yourAgg(Val)
> From t
> Group by attr1,attr2...
> Grouping sets((attr1,attr2),(aytr1))
> 
>> On 12 Nov 2016 04:57, "Anil Langote" <anillangote0...@gmail.com> wrote:
>> Hi All,
>> 
>>  
>> 
>> I have been working on one use case and couldn’t able to think the better 
>> solution, I have seen you very active on spark user list please throw your 
>> thoughts on implementation. Below is the requirement.
>> 
>>  
>> 
>> I have tried using dataset by splitting the double array column but it fails 
>> when double size grows. When I create the double array schema data type 
>> spark doesn’t allow me to sum them because it would be done only on numeric 
>> types. If I think about storing the file per combination wise to parquet 
>> there will be too much parquet files.
>> 
>>  
>> 
>> Input :  The input file will be like below in real data the attributes will 
>> be 20 & the double array would be 50,000
>> 
>>  
>> 
>>  
>> 
>> Attribute_0
>> 
>> Attribute_1
>> 
>> Attribute_2
>> 
>> Attribute_3
>> 
>> DoubleArray
>> 
>> 5
>> 
>> 3
>> 
>> 5
>> 
>> 3
>> 
>> 0.2938933463658645  0.0437040427073041  0.23002681025029648  
>> 0.18003221216680454
>> 
>> 3
>> 
>> 2
>> 
>> 1
>> 
>> 3
>> 
>> 0.5353599620508771  0.026777650111232787  0.31473082754161674  
>> 0.2647786522276575
>> 
>> 5
>> 
>> 3
>> 
>> 5
>> 
>> 2
>> 
>> 0.8803063581705307  0.8101324740101096  0.48523937757683544  
>> 0.5897714618376072
>> 
>> 3
>> 
>> 2
>> 
>> 1
>> 
>> 3
>> 
>> 0.33960064683141955  0.46537001358164043  0.543428826489435  
>> 0.42653939565053034
>> 
>> 2
>> 
>> 2
>> 
>> 0
>> 
>> 5
>> 
>> 0.5108235777360906  0.4368119043922922  0.8651556676944931  
>> 0.7451477943975504
>> 
>>  
>> 
>> Now below are the possible combinations in above data set this will be all 
>> possible combinations
>> 
>>  
>> 
>> 1.  Attribute_0, Attribute_1
>> 
>> 2.  Attribute_0, Attribute_2
>> 
>> 3.  Attribute_0, Attribute_3
>> 
>> 4.  Attribute_1, Attribute_2
>> 
>> 5.  Attribute_2, Attribute_3
>> 
>> 6.  Attribute_1, Attribute_3
>> 
>> 7.  Attribute_0, Attribute_1, Attribute_2
>> 
>> 8.  Attribute_0, Attribute_1, Attribute_3
>> 
>> 9.  Attribute_0, Attribute_2, Attribute_3
>> 
>> 10.  Attribute_1, Attribute_2, Attribute_3
>> 
>> 11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4
>> 
>>  
>> 
>> Now we have to process all these combinations on input data preferably 
>> parallel to get good performance.
>> 
>>  
>> 
>> Attribute_0, Attribute_1
>> 
>>  
>> 
>> In this iteration the other attributes (Attribute_2, Attribute_3) are not 
>> required all we need is Attribute_0, Attribute_1 & double array columns. If 
>> you see the data there are two possible combination in the data one is 5_3 
>> and other one is 3_2 we have to pick only those which has at least 2 
>> combinations in real data we will get in thousands. 
>> 
>>  
>> 
>>  
>> 
>> Attribute_0
>> 
>> Attribute_1
>> 
>> Attribute_2
>> 
>> Attribute_3
>> 
>> DoubleArray
>> 
>> 5
>> 
>> 3
>> 
>> 5
>> 
>> 3
>> 
>> 0.2938933463658645  0.0437040427073041  0.23002681025029648  
>> 0.18003221216680454
>> 
>> 3
>> 
>> 2
>> 
>> 1
>> 
>> 3
>> 
>> 0.5353599620508771  0.026777650111232787  0.31473082754161674  
>> 0.2647786522276575
>> 
>> 5
>> 
>> 3
>> 
>> 5
>> 
&g

DataSet is not able to handle 50,000 columns to sum

2016-11-11 Thread Anil Langote
Hi All,

 

I have been working on one use case and couldn’t able to think the better 
solution, I have seen you very active on spark user list please throw your 
thoughts on implementation. Below is the requirement.

 

I have tried using dataset by splitting the double array column but it fails 
when double size grows. When I create the double array schema data type spark 
doesn’t allow me to sum them because it would be done only on numeric types. If 
I think about storing the file per combination wise to parquet there will be 
too much parquet files.

 

Input :  The input file will be like below in real data the attributes will be 
20 & the double array would be 50,000

 

 

Attribute_0Attribute_1Attribute_2Attribute_3DoubleArray
53530.2938933463658645  0.0437040427073041  0.23002681025029648  
0.18003221216680454
32130.5353599620508771  0.026777650111232787  0.31473082754161674  
0.2647786522276575
53520.8803063581705307  0.8101324740101096  0.48523937757683544  
0.5897714618376072
32130.33960064683141955  0.46537001358164043  0.543428826489435  
0.42653939565053034
22050.5108235777360906  0.4368119043922922  0.8651556676944931  
0.7451477943975504

 

Now below are the possible combinations in above data set this will be all 
possible combinations 1.  Attribute_0, Attribute_12.  Attribute_0, 
Attribute_23.  Attribute_0, Attribute_34.  Attribute_1, Attribute_25.   
   Attribute_2, Attribute_36.  Attribute_1, Attribute_37.  Attribute_0, 
Attribute_1, Attribute_28.  Attribute_0, Attribute_1, Attribute_39.  
Attribute_0, Attribute_2, Attribute_310.  Attribute_1, Attribute_2, 
Attribute_311.  Attribute_1, Attribute_2, Attribute_3, Attribute_4 Now we have 
to process all these combinations on input data preferably parallel to get good 
performance. Attribute_0, Attribute_1 In this iteration the other attributes 
(Attribute_2, Attribute_3) are not required all we need is Attribute_0, 
Attribute_1 & double array columns. If you see the data there are two possible 
combination in the data one is 5_3 and other one is 3_2 we have to pick only 
those which has at least 2 combinations in real data we will get in thousands.  
 
Attribute_0Attribute_1Attribute_2Attribute_3DoubleArray
53530.2938933463658645  0.0437040427073041  0.23002681025029648  
0.18003221216680454
32130.5353599620508771  0.026777650111232787  0.31473082754161674  
0.2647786522276575
53520.8803063581705307  0.8101324740101096  0.48523937757683544  
0.5897714618376072
32130.33960064683141955  0.46537001358164043  0.543428826489435  
0.42653939565053034
22050.5108235777360906  0.4368119043922922  0.8651556676944931  
0.7451477943975504

 

when we do the groupBy on above dataset with columns Attribute_0, Attribute_1 
we will get two records with keys 5_3 & 3_2 and each key will have two double 
arrays.

 

5_3 ==> 0.2938933463658645  0.0437040427073041  0.23002681025029648  
0.18003221216680454 & 0.8803063581705307  0.8101324740101096  
0.48523937757683544  0.5897714618376072

 

3_2 ==> 0.5353599620508771  0.026777650111232787  0.31473082754161674  
0.2647786522276575 & 0.33960064683141955  0.46537001358164043  
0.543428826489435  0.42653939565053034

 

now we have to add these double arrays index wise and produce the one array

 

5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319, 
0.7698036740044117]

3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518, 
0.6913180478781878]

 

After adding we have to compute average, min, max etc on these vector and store 
the results against the keys.

 

Same process will be repeated for next combinations. 

 

 

 

Thank you

Anil Langote

+1-425-633-9747

 



Running yarn with spark not working with Java 8

2016-08-25 Thread Anil Langote
Hi All,

I have cluster with 1 master and 6 slaves which uses pre-built version of
hadoop 2.6.0 and spark 1.6.2. I was running hadoop MR and spark jobs
without any problem with openjdk 7 installed on all the nodes. However when
I upgraded openjdk 7 to openjdk 8 on all nodes, spark submit and
spark-shell with yarn caused error.

I have exported JAVA_HOME on .bashrc and have set the openjdk 8 as default
java using

sudo update-alternatives --config java
sudo update-alternatives --config javac

these commands. Also I have tried with oracle java 8 and the same error
comes up. The container logs on the slave nodes have same error as below.

I have tried with spark 1.6.2 pre-built version, spark 2.0 pre-built
version and also tried with spark 2.0 by building it myself.

Hadoop job works perfectly even after upgrading to java 8. When i switch
back to java 7, spark works fine.

My scala version is 2.11 and OS is Ubuntu 14.04.4 LTS .

It will be very great if someone can give me an idea to solve this problem.

Thanks!

ps I have changed my IP address as xxx.xxx.xxx.xx on the logs

Stack Trace with Oracle java 8

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/tmp/hadoop-hd_spark/nm-local-dir/usercache/hd_spark/filecache/17/__spark_libs__8247267244939901627.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/08/17 14:05:11 INFO executor.CoarseGrainedExecutorBackend: Started
daemon with process name: 23541@slave01
16/08/17 14:05:11 INFO util.SignalUtils: Registered signal handler for TERM
16/08/17 14:05:11 INFO util.SignalUtils: Registered signal handler for HUP
16/08/17 14:05:11 INFO util.SignalUtils: Registered signal handler for INT
16/08/17 14:05:11 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
16/08/17 14:05:11 INFO spark.SecurityManager: Changing view acls to: hd_spark
16/08/17 14:05:11 INFO spark.SecurityManager: Changing modify acls to: hd_spark
16/08/17 14:05:11 INFO spark.SecurityManager: Changing view acls groups to:
16/08/17 14:05:11 INFO spark.SecurityManager: Changing modify acls groups to:
16/08/17 14:05:11 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users  with view
permissions: Set(hd_spark); groups with view permissions: Set(); users
 with modify permissions: Set(hd_spark); groups with modify
permissions: Set()
16/08/17 14:05:12 INFO client.TransportClientFactory: Successfully
created connection to /xxx.xxx.xxx.xx:37417 after 78 ms (0 ms spent in
bootstraps)
16/08/17 14:05:12 INFO spark.SecurityManager: Changing view acls to: hd_spark
16/08/17 14:05:12 INFO spark.SecurityManager: Changing modify acls to: hd_spark
16/08/17 14:05:12 INFO spark.SecurityManager: Changing view acls groups to:
16/08/17 14:05:12 INFO spark.SecurityManager: Changing modify acls groups to:
16/08/17 14:05:12 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users  with view
permissions: Set(hd_spark); groups with view permissions: Set(); users
 with modify permissions: Set(hd_spark); groups with modify
permissions: Set()
16/08/17 14:05:12 INFO client.TransportClientFactory: Successfully
created connection to /xxx.xxx.xxx.xx:37417 after 1 ms (0 ms spent in
bootstraps)
16/08/17 14:05:12 INFO storage.DiskBlockManager: Created local
directory at 
/tmp/hadoop-hd_spark/nm-local-dir/usercache/hd_spark/appcache/application_1471352972661_0005/blockmgr-d9f23a56-1420-4cd4-abfd-ae9e128c688c
16/08/17 14:05:12 INFO memory.MemoryStore: MemoryStore started with
capacity 366.3 MB
16/08/17 14:05:12 INFO executor.CoarseGrainedExecutorBackend:
Connecting to driver:
spark://coarsegrainedschedu...@xxx.xxx.xxx.xx:37417
16/08/17 14:05:13 ERROR executor.CoarseGrainedExecutorBackend:
RECEIVED SIGNAL TERM
16/08/17 14:05:13 INFO storage.DiskBlockManager: Shutdown hook called
16/08/17 14:05:13 INFO util.ShutdownHookManager: Shutdown hook called



Stack Trace:

16/08/17 14:06:22 ERROR client.TransportClient: Failed to send RPC
4688442384427245199 to /xxx.xxx.xxx.xx:42955:
java.nio.channels.ClosedChannelExce  ption
java.nio.channels.ClosedChannelException
16/08/17 14:06:22 WARN netty.NettyRpcEndpointRef: Error sending
message [message = RequestExecutors(0,0,Map())] in 1 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 

Append is not working with data frame

2016-04-20 Thread Anil Langote
Hi All,

We are pulling the data from oracle tables and writing them using partitions as 
parquet files, this is daily process it works fine till 18th day (18 days load 
works fine), however on 19 th day load the data frame load process hangs and 
load action called more than once, if we remove the data and just run for 19th 
day it loads the data successfully, why the load fails for 19th day in APPEND 
mode where as the 19th day works fine. On Spark UI we can see first load job 
takes around 5 min and duplicate load jobs just takes few seconds, we are stuck 
with this we want to process 60 days of data.

∂
Thank you 
Anil Langote


> On Apr 20, 2016, at 1:12 PM, Wei Chen <wei.chen.ri...@gmail.com> wrote:
> 
> Found it. In case someone else if looking for this:
> cvModel.bestModel.asInstanceOf[org.apache.spark.ml.classification.LogisticRegressionModel].weights
> 
> On Tue, Apr 19, 2016 at 1:12 PM, Wei Chen <wei.chen.ri...@gmail.com 
> <mailto:wei.chen.ri...@gmail.com>> wrote:
> Hi All,
> 
> I am using the example of model selection via cross-validation from the 
> documentation here: http://spark.apache.org/docs/latest/ml-guide.html 
> <http://spark.apache.org/docs/latest/ml-guide.html>. After I get the 
> "cvModel", I would like to see the weights for each feature for the best 
> logistic regression model. I've been looking at the methods and attributes of 
> this "cvModel" and "cvModel.bestModel" and still can't figure out where these 
> weights are referred. It must be somewhere since we can use "cvModel" to 
> transform a new dataframe. Your help is much appreciated.
> 
> 
> Thank you,
> Wei
> 
> 
> 
> -- 
> Wei Chen, Ph.D.
> Astronomer and Data Scientist
> Phone: (832)646-7124
> Email: wei.chen.ri...@gmail.com <mailto:wei.chen.ri...@gmail.com>
> LinkedIn: https://www.linkedin.com/in/weichen1984 
> <https://www.linkedin.com/in/weichen1984>