Loading a large parquet file how much memory do I need

2017-11-27 Thread Alexander Czech
I want to load a 10TB parquet File from S3 and I'm trying to decide what
EC2 instances to use.

Should I go for instances that in total have a larger memory size than
10TB? Or is it enough that they have in total enough SSD storage so that
everything can be spilled to disk?

thanks


Re: Loading a large parquet file how much memory do I need

2017-11-27 Thread Georg Heiler
How many columns do you need from the big file?

Also how CPU / memory intensive are the computations you want to perform?
Alexander Czech  schrieb am Mo. 27. Nov.
2017 um 10:57:

> I want to load a 10TB parquet File from S3 and I'm trying to decide what
> EC2 instances to use.
>
> Should I go for instances that in total have a larger memory size than
> 10TB? Or is it enough that they have in total enough SSD storage so that
> everything can be spilled to disk?
>
> thanks
>


Re: Loading a large parquet file how much memory do I need

2017-11-27 Thread Alexander Czech
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The
bulk of data is in url_list and at the moment I can only guess how large
url_list is. I want to give an ID to every url and then this ID to every
url in url_list to have a ID to ID graph.The columns language,vector and
text only have values for 1% of all rows so they only play a very minor
roll.

The idea at the moment is to load the URL and URL_list column from the
parquet and give ever row an ID. Then exploded the URL_list and join the
IDs to this on the now exploded rows. After that I drop the URLs from
URL_list column. For the rest of the computation I only load those rows
from the parquet that have values in (language,vector and text) and join
them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that
heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> I have a temporary result file ( the 10TB one) that looks like this
> I have around 3 billion rows of (url,url_list,language,vector,text). The
> bulk of data is in url_list and at the moment I can only guess how large
> url_list is. I want to give an ID to every url and then this ID to every
> url in url_list to have a ID to ID graph.The columns language,vector and
> text only have values for 1% of all rows so they only play a very minor
> roll.
>
> The idea at the moment is to load the URL and URL_list column from the
> parquet and give ever row an ID. Then exploded the URL_list and join the
> IDs to this on the now exploded rows. After that I drop the URLs from
> URL_list column. For the rest of the computation I only load those rows
> from the parquet that have values in (language,vector and text) and join
> them with the ID table.
>
> In the end I will create 3 tables:
> 1. url, ID
> 2. ID, ID
> 3. ID,language,vector,text
>
> Basically there is one very big shuffle going on the rest is not that
> heavy. The CPU intense lifting was done before that.
>
> On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler 
> wrote:
>
>> How many columns do you need from the big file?
>>
>> Also how CPU / memory intensive are the computations you want to perform?
>>
>> Alexander Czech  schrieb am Mo. 27. Nov.
>> 2017 um 10:57:
>>
>>> I want to load a 10TB parquet File from S3 and I'm trying to decide what
>>> EC2 instances to use.
>>>
>>> Should I go for instances that in total have a larger memory size than
>>> 10TB? Or is it enough that they have in total enough SSD storage so that
>>> everything can be spilled to disk?
>>>
>>> thanks
>>>
>>
>


Re: Loading a large parquet file how much memory do I need

2017-11-27 Thread Gourav Sengupta
Hi,

it would be much simpler in case you just provide two tables with the
samples of input and output. Going through the verbose text and trying to
read and figure out what is happening is a bit daunting.

Personally, given that you have your entire data in Parquet, I do not think
that you will need to have a large cluster size at all. You can do it with
a small size cluster as well, but depending on the cluster size, you might
want to create intermediate staging tables or persist the data.

Also it will be of help if you could kindly provide the EMR version that
you are using.


On another note also mention the AWS Region you are in. If Redshift
Spectrum is available, or you can use Athena, or you can use Presto, then
running massive aggregates over huge data sets at fraction of cost and at
least 10x speed may be handy as well.

Let me know in case you need any further help.

Regards,
Gourav

On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> I have a temporary result file ( the 10TB one) that looks like this
> I have around 3 billion rows of (url,url_list,language,vector,text). The
> bulk of data is in url_list and at the moment I can only guess how large
> url_list is. I want to give an ID to every url and then this ID to every
> url in url_list to have a ID to ID graph.The columns language,vector and
> text only have values for 1% of all rows so they only play a very minor
> roll.
>
> The idea at the moment is to load the URL and URL_list column from the
> parquet and give ever row an ID. Then exploded the URL_list and join the
> IDs to this on the now exploded rows. After that I drop the URLs from
> URL_list column. For the rest of the computation I only load those rows
> from the parquet that have values in (language,vector and text) and join
> them with the ID table.
>
> In the end I will create 3 tables:
> 1. url, ID
> 2. ID, ID
> 3. ID,language,vector,text
>
> Basically there is one very big shuffle going on the rest is not that
> heavy. The CPU intense lifting was done before that.
>
> On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <
> alexander.cz...@googlemail.com> wrote:
>
>> I have a temporary result file ( the 10TB one) that looks like this
>> I have around 3 billion rows of (url,url_list,language,vector,text). The
>> bulk of data is in url_list and at the moment I can only guess how large
>> url_list is. I want to give an ID to every url and then this ID to every
>> url in url_list to have a ID to ID graph.The columns language,vector and
>> text only have values for 1% of all rows so they only play a very minor
>> roll.
>>
>> The idea at the moment is to load the URL and URL_list column from the
>> parquet and give ever row an ID. Then exploded the URL_list and join the
>> IDs to this on the now exploded rows. After that I drop the URLs from
>> URL_list column. For the rest of the computation I only load those rows
>> from the parquet that have values in (language,vector and text) and join
>> them with the ID table.
>>
>> In the end I will create 3 tables:
>> 1. url, ID
>> 2. ID, ID
>> 3. ID,language,vector,text
>>
>> Basically there is one very big shuffle going on the rest is not that
>> heavy. The CPU intense lifting was done before that.
>>
>> On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler > > wrote:
>>
>>> How many columns do you need from the big file?
>>>
>>> Also how CPU / memory intensive are the computations you want to perform?
>>>
>>> Alexander Czech  schrieb am Mo. 27.
>>> Nov. 2017 um 10:57:
>>>
 I want to load a 10TB parquet File from S3 and I'm trying to decide
 what EC2 instances to use.

 Should I go for instances that in total have a larger memory size than
 10TB? Or is it enough that they have in total enough SSD storage so that
 everything can be spilled to disk?

 thanks

>>>
>>
>


Cosine Similarity between documents - Rows

2017-11-27 Thread Donni Khan
I have spark job to compute the similarity between text documents:

RowMatrix rowMatrix = new RowMatrix(vectorsRDD.rdd());
CoordinateMatrix
rowsimilarity=rowMatrix.columnSimilarities(0.5);JavaRDD
entries = rowsimilarity.entries().toJavaRDD();
List list = entries.collect();
for(MatrixEntry s : list) System.out.println(s);

the MatrixEntry(i, j, value) represents the similarity between
columns(let's say the features of documents).
But how can I show the similarity between rows?
suppose I have five documents Doc1, Doc5, We would like to show the
similarity between all those documnts.
 How do I get that? any help?

Thank you
Donni


Re: Cosine Similarity between documents - Rows

2017-11-27 Thread Ge, Yao (Y.)
You are essential doing document clustering. K-means will do it. You do have to 
specify the number of clusters up front.

Sent from Email+ secured by MobileIron




From: "Donni Khan" 
mailto:prince.don...@googlemail.com>>
Date: Monday, November 27, 2017 at 7:27:33 AM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Cosine Similarity between documents - Rows


I have spark job to compute the similarity between text documents:

RowMatrix rowMatrix = new RowMatrix(vectorsRDD.rdd());
CoordinateMatrix  rowsimilarity=rowMatrix.columnSimilarities(0.5);
JavaRDD entries = rowsimilarity.entries().toJavaRDD();

List list = entries.collect();

for(MatrixEntry s : list) System.out.println(s);

the MatrixEntry(i, j, value) represents the similarity between columns(let's 
say the features of documents).
But how can I show the similarity between rows?
suppose I have five documents Doc1, Doc5, We would like to show the 
similarity between all those documnts.
 How do I get that? any help?

Thank you
Donni


Re: Loading a large parquet file how much memory do I need

2017-11-27 Thread Alexander Czech
I don't use EMR I spin my clusters up using flintrock (beeing a student my
budget is slim), my code is writen in pyspark and my data is in the
us-east-1 region (N. Virginia). I will do my best explaining it with tables:

My input with a size of (10TB) sits in multiple (~150) parquets on S3

+---+--+---+--+---+
|uri| link_list|lang_id|vector|content|
+---+--+---+--+---+
|www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
|www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
|www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|


*(link_list is a  ArrayType(StringType()))*

Step1 : I only load the uri and link_list columns (but they make up the
bulk of the data). Then every uri is given a unique ID with
df.withColumn('uri_id',
func.monotonically_increasing_id())
resulting in a dataframe looking like this

*DF_A*:

+---+--+---+
|uri| link_list| uri_id|
+---+--+---+
|www.123.com|[www.123.com,www.abc.com,]|  1|
|www.abc.com|[www.opq.com,www.456.com,]|  2|
|www.456.com|[www.xyz.com,www.abc.com,]|  3|

Step 2: I create another dataframe containing only the uri and uri_id
which is renamed to link_id fields

*DF_B*:
+---++
|uri| link_id|
+---++
|www.123.com|   1|
|www.abc.com|   2|
|www.456.com|   3|

Step 3: Now I exploded the link_list field in *DF_A* with
*DF_A*.select("uri_id", func.explode("link_list").alias("link"))
This gives me

*DF_C*:
+---+---+
|   link| uri_id|
+---+---+
|www.123.com|  1|
|www.abc.com|  1|
|www.opq.com|  2|
|www.456.com|  2|
|www.xyz.com|  3|
|www.abc.com|  3|


Lastly I Join DF_B DF_C *DF_C*.join(*DF_B*, *DF_C*.link==*DF_B*.uri,
"left_outer").drop("uri") Which results in the final dataframe:


+---+---++
|   link| uri_id| link_id|
+---+---++
|www.123.com|  1|   1|
|www.abc.com|  1|   2|
|www.opq.com|  2|null|
|www.456.com|  2|   3|
|www.xyz.com|  3|null|
|www.abc.com|  3|   1|

(in code the field link is also dropped but this makes it hopefully
more intelligible this way)


the rest is to just join the uri_id with the lang_id,vector,content
rows that are not null which is trivial.

I hope this makes it more readable. If there is an aws service that
makes it easier for me to deal with the data, since it is basically
"just" database operations I'm also happy to hear about it.
I got a few days on my hands until the preprocessing is done but I'm
not sure if the explod in step 3 can be done in another aws service.

thanks!


On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta  wrote:

> Hi,
>
> it would be much simpler in case you just provide two tables with the
> samples of input and output. Going through the verbose text and trying to
> read and figure out what is happening is a bit daunting.
>
> Personally, given that you have your entire data in Parquet, I do not
> think that you will need to have a large cluster size at all. You can do it
> with a small size cluster as well, but depending on the cluster size, you
> might want to create intermediate staging tables or persist the data.
>
> Also it will be of help if you could kindly provide the EMR version that
> you are using.
>
>
> On another note also mention the AWS Region you are in. If Redshift
> Spectrum is available, or you can use Athena, or you can use Presto, then
> running massive aggregates over huge data sets at fraction of cost and at
> least 10x speed may be handy as well.
>
> Let me know in case you need any further help.
>
> Regards,
> Gourav
>
> On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <
> alexander.cz...@googlemail.com> wrote:
>
>> I have a temporary result file ( the 10TB one) that looks like this
>> I have around 3 billion rows of (url,url_list,language,vector,text). The
>> bulk of data is in url_list and at the moment I can only guess how large
>> url_list is. I want to give an ID to every url and then this ID to every
>> url in url_list to have a ID to ID graph.The columns language,vector and
>> text only have values for 1% of all rows so they only play a very minor
>> roll.
>>
>> The idea at the moment is to load the URL and URL_list column from the
>> parquet and give ever row an ID. Then exploded the URL_list and join the
>> IDs to this on the now exploded rows. After that I drop the URLs from
>> URL_list column. For the rest of the computation I only load those rows
>> from the parquet that have values in (language,vector and text) and join
>> them with the ID table.
>>
>> In the end I will create 3 tables:
>> 1. url, ID
>> 2. ID, ID
>> 3. ID,language,vector,text
>>
>> Basically there is one very big shuffle going on the rest is not that
>> heavy. The CPU intense 

Re: Loading a large parquet file how much memory do I need

2017-11-27 Thread Gourav Sengupta
Hi,

I think that I have mentioned all the required alternatives. However I am
quite curious as to how did you conclude that processing using EMR is going
to be more expensive than using any other stack. I have been using EMR
since last 6 years (almost about the time it came out), and have always
found it cheap, reliable, safe and stable (ofcourse its like fire, if you
are not careful it can end up burning you financially).

Regards,
Gourav Sengupta

On Mon, Nov 27, 2017 at 12:58 PM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> I don't use EMR I spin my clusters up using flintrock (beeing a student my
> budget is slim), my code is writen in pyspark and my data is in the
> us-east-1 region (N. Virginia). I will do my best explaining it with tables:
>
> My input with a size of (10TB) sits in multiple (~150) parquets on S3
>
> +---+--+---+--+---+
> |uri| link_list|lang_id|vector|content|
> +---+--+---+--+---+
> |www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
> |www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
> |www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|
>
>
> *(link_list is a  ArrayType(StringType()))*
>
> Step1 : I only load the uri and link_list columns (but they make up the
> bulk of the data). Then every uri is given a unique ID with 
> df.withColumn('uri_id',
> func.monotonically_increasing_id())
> resulting in a dataframe looking like this
>
> *DF_A*:
>
> +---+--+---+
> |uri| link_list| uri_id|
> +---+--+---+
> |www.123.com|[www.123.com,www.abc.com,]|  1|
> |www.abc.com|[www.opq.com,www.456.com,]|  2|
> |www.456.com|[www.xyz.com,www.abc.com,]|  3|
>
> Step 2: I create another dataframe containing only the uri and uri_id which 
> is renamed to link_id fields
>
> *DF_B*:
> +---++
> |uri| link_id|
> +---++
> |www.123.com|   1|
> |www.abc.com|   2|
> |www.456.com|   3|
>
> Step 3: Now I exploded the link_list field in *DF_A* with  
> *DF_A*.select("uri_id", func.explode("link_list").alias("link"))
> This gives me
>
> *DF_C*:
> +---+---+
> |   link| uri_id|
> +---+---+
> |www.123.com|  1|
> |www.abc.com|  1|
> |www.opq.com|  2|
> |www.456.com|  2|
> |www.xyz.com|  3|
> |www.abc.com|  3|
>
>
> Lastly I Join DF_B DF_C *DF_C*.join(*DF_B*, *DF_C*.link==*DF_B*.uri, 
> "left_outer").drop("uri") Which results in the final dataframe:
>
>
> +---+---++
> |   link| uri_id| link_id|
> +---+---++
> |www.123.com|  1|   1|
> |www.abc.com|  1|   2|
> |www.opq.com|  2|null|
> |www.456.com|  2|   3|
> |www.xyz.com|  3|null|
> |www.abc.com|  3|   1|
>
> (in code the field link is also dropped but this makes it hopefully more 
> intelligible this way)
>
>
> the rest is to just join the uri_id with the lang_id,vector,content rows that 
> are not null which is trivial.
>
> I hope this makes it more readable. If there is an aws service that makes it 
> easier for me to deal with the data, since it is basically "just" database 
> operations I'm also happy to hear about it.
> I got a few days on my hands until the preprocessing is done but I'm not sure 
> if the explod in step 3 can be done in another aws service.
>
> thanks!
>
>
> On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> it would be much simpler in case you just provide two tables with the
>> samples of input and output. Going through the verbose text and trying to
>> read and figure out what is happening is a bit daunting.
>>
>> Personally, given that you have your entire data in Parquet, I do not
>> think that you will need to have a large cluster size at all. You can do it
>> with a small size cluster as well, but depending on the cluster size, you
>> might want to create intermediate staging tables or persist the data.
>>
>> Also it will be of help if you could kindly provide the EMR version that
>> you are using.
>>
>>
>> On another note also mention the AWS Region you are in. If Redshift
>> Spectrum is available, or you can use Athena, or you can use Presto, then
>> running massive aggregates over huge data sets at fraction of cost and at
>> least 10x speed may be handy as well.
>>
>> Let me know in case you need any further help.
>>
>> Regards,
>> Gourav
>>
>> On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <
>> alexander.cz...@googlemail.com> wrote:
>>
>>> I have a temporary result file ( the 10TB one) that looks like this
>>> I have around 3 billion rows of (url,url_list,language,vector,text).
>>> The bulk of data is in url_list and at the moment I can only guess how
>>> large url_list is. I want to give an ID to every url and then this 

[Spark R]: dapply only works for very small datasets

2017-11-27 Thread Kunft, Andreas
Hello,


I tried to execute some user defined functions with R using the airline arrival 
performance dataset.

While the examples from the documentation for the `<-` apply operator work 
perfectly fine on a size ~9GB,

the `dapply` operator fails to finish even after ~4 hours.


I'm using a function similar to the one from the documentation:


df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)

I checked Stackoverflow and even asked the question there as well, but till now 
the only answer I got was:
"Avoid using dapply, gapply"

So, do I miss some parameters or is there are general limitation?
I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several 
DOPs.

Best
Andreas



Re: [Spark R]: dapply only works for very small datasets

2017-11-27 Thread Felix Cheung
What's the number of executor and/or number of partitions you are working with?

I'm afraid most of the problem is with the serialization deserialization 
overhead between JVM and R...


From: Kunft, Andreas 
Sent: Monday, November 27, 2017 10:27:33 AM
To: user@spark.apache.org
Subject: [Spark R]: dapply only works for very small datasets


Hello,


I tried to execute some user defined functions with R using the airline arrival 
performance dataset.

While the examples from the documentation for the `<-` apply operator work 
perfectly fine on a size ~9GB,

the `dapply` operator fails to finish even after ~4 hours.


I'm using a function similar to the one from the documentation:


df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)

I checked Stackoverflow and even asked the question there as well, but till now 
the only answer I got was:
"Avoid using dapply, gapply"

So, do I miss some parameters or is there are general limitation?
I'm using Spark 2.2.0 and read the data from HDFS 2.7.1 and played with several 
DOPs.

Best
Andreas



Re: Loading a large parquet file how much memory do I need

2017-11-27 Thread Alexander Czech
Hi
Simply because you have to pay on top of every instance hour. I currently
need about 4800h of r3.2xlarge EMR takes 0.18$ instance hour so it would be
864$ just in EMR costs (spot prices are around 0.12$/h).

Just to stay on topic I thought about getting 40 i2.xlarge instances which
have about 1TB of combined ram and 32TB of combined SSD space would this be
enough to load a 10TB parquet or do I need more RAM/Disk spill space?

On Mon, Nov 27, 2017 at 6:06 PM, Gourav Sengupta 
wrote:

> Hi,
>
> I think that I have mentioned all the required alternatives. However I am
> quite curious as to how did you conclude that processing using EMR is going
> to be more expensive than using any other stack. I have been using EMR
> since last 6 years (almost about the time it came out), and have always
> found it cheap, reliable, safe and stable (ofcourse its like fire, if you
> are not careful it can end up burning you financially).
>
> Regards,
> Gourav Sengupta
>
> On Mon, Nov 27, 2017 at 12:58 PM, Alexander Czech <
> alexander.cz...@googlemail.com> wrote:
>
>> I don't use EMR I spin my clusters up using flintrock (beeing a student
>> my budget is slim), my code is writen in pyspark and my data is in the
>> us-east-1 region (N. Virginia). I will do my best explaining it with tables:
>>
>> My input with a size of (10TB) sits in multiple (~150) parquets on S3
>>
>> +---+--+---+--+---+
>> |uri| link_list|lang_id|vector|content|
>> +---+--+---+--+---+
>> |www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
>> |www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
>> |www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|
>>
>>
>> *(link_list is a  ArrayType(StringType()))*
>>
>> Step1 : I only load the uri and link_list columns (but they make up the
>> bulk of the data). Then every uri is given a unique ID with 
>> df.withColumn('uri_id',
>> func.monotonically_increasing_id())
>> resulting in a dataframe looking like this
>>
>> *DF_A*:
>>
>> +---+--+---+
>> |uri| link_list| uri_id|
>> +---+--+---+
>> |www.123.com|[www.123.com,www.abc.com,]|  1|
>> |www.abc.com|[www.opq.com,www.456.com,]|  2|
>> |www.456.com|[www.xyz.com,www.abc.com,]|  3|
>>
>> Step 2: I create another dataframe containing only the uri and uri_id which 
>> is renamed to link_id fields
>>
>> *DF_B*:
>> +---++
>> |uri| link_id|
>> +---++
>> |www.123.com|   1|
>> |www.abc.com|   2|
>> |www.456.com|   3|
>>
>> Step 3: Now I exploded the link_list field in *DF_A* with  
>> *DF_A*.select("uri_id", func.explode("link_list").alias("link"))
>> This gives me
>>
>> *DF_C*:
>> +---+---+
>> |   link| uri_id|
>> +---+---+
>> |www.123.com|  1|
>> |www.abc.com|  1|
>> |www.opq.com|  2|
>> |www.456.com|  2|
>> |www.xyz.com|  3|
>> |www.abc.com|  3|
>>
>>
>> Lastly I Join DF_B DF_C *DF_C*.join(*DF_B*, *DF_C*.link==*DF_B*.uri, 
>> "left_outer").drop("uri") Which results in the final dataframe:
>>
>>
>> +---+---++
>> |   link| uri_id| link_id|
>> +---+---++
>> |www.123.com|  1|   1|
>> |www.abc.com|  1|   2|
>> |www.opq.com|  2|null|
>> |www.456.com|  2|   3|
>> |www.xyz.com|  3|null|
>> |www.abc.com|  3|   1|
>>
>> (in code the field link is also dropped but this makes it hopefully more 
>> intelligible this way)
>>
>>
>> the rest is to just join the uri_id with the lang_id,vector,content rows 
>> that are not null which is trivial.
>>
>> I hope this makes it more readable. If there is an aws service that makes it 
>> easier for me to deal with the data, since it is basically "just" database 
>> operations I'm also happy to hear about it.
>> I got a few days on my hands until the preprocessing is done but I'm not 
>> sure if the explod in step 3 can be done in another aws service.
>>
>> thanks!
>>
>>
>> On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> it would be much simpler in case you just provide two tables with the
>>> samples of input and output. Going through the verbose text and trying to
>>> read and figure out what is happening is a bit daunting.
>>>
>>> Personally, given that you have your entire data in Parquet, I do not
>>> think that you will need to have a large cluster size at all. You can do it
>>> with a small size cluster as well, but depending on the cluster size, you
>>> might want to create intermediate staging tables or persist the data.
>>>
>>> Also it will be of help if you could kindly provide the EMR version that
>>> you are using.
>>>
>>>
>>> On another note also mention the AWS Region you are in. If Redshift
>>> Spectrum is available, or you can use A

Using MatrixFactorizationModel as a feature extractor

2017-11-27 Thread Corey Nolet
I'm trying to use the MatrixFactorizationModel to, for instance, determine
the latent factors of a user or item that were not used in the training
data of the model. I'm not as concerned about the rating as I am with the
latent factors for the user/item.

Thanks!


Re: Using MatrixFactorizationModel as a feature extractor

2017-11-27 Thread Corey Nolet
I know that the algorithm itself is not able to extract features for a user
that it was not trained on, however, I'm trying to find a way to compare
users for similarity so that when I find a user that's really similar to
another user, I can just use the similar user's recommendations until the
other user gets worked into the model.

On Mon, Nov 27, 2017 at 3:08 PM, Corey Nolet  wrote:

> I'm trying to use the MatrixFactorizationModel to, for instance, determine
> the latent factors of a user or item that were not used in the training
> data of the model. I'm not as concerned about the rating as I am with the
> latent factors for the user/item.
>
> Thanks!
>


Re: Loading a large parquet file how much memory do I need

2017-11-27 Thread Gourav Sengupta
Hi,

10 TB in Athena would cost $50. If your data is in Parquet, then it will
cost even less because of columnar striping. So I am genuinely not quite
sure what you are speaking about? Also what do you mean by "I currently
need"? Are you already processing the data?

Since you mentioned that you are a student, may I please ask which
University and College you are studying at? I may provide some additional
information regarding the same.


Regards,
Gourav Sengupta


On Mon, Nov 27, 2017 at 7:55 PM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> Hi
> Simply because you have to pay on top of every instance hour. I currently
> need about 4800h of r3.2xlarge EMR takes 0.18$ instance hour so it would be
> 864$ just in EMR costs (spot prices are around 0.12$/h).
>
> Just to stay on topic I thought about getting 40 i2.xlarge instances which
> have about 1TB of combined ram and 32TB of combined SSD space would this be
> enough to load a 10TB parquet or do I need more RAM/Disk spill space?
>
> On Mon, Nov 27, 2017 at 6:06 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> I think that I have mentioned all the required alternatives. However I am
>> quite curious as to how did you conclude that processing using EMR is going
>> to be more expensive than using any other stack. I have been using EMR
>> since last 6 years (almost about the time it came out), and have always
>> found it cheap, reliable, safe and stable (ofcourse its like fire, if you
>> are not careful it can end up burning you financially).
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Mon, Nov 27, 2017 at 12:58 PM, Alexander Czech <
>> alexander.cz...@googlemail.com> wrote:
>>
>>> I don't use EMR I spin my clusters up using flintrock (beeing a student
>>> my budget is slim), my code is writen in pyspark and my data is in the
>>> us-east-1 region (N. Virginia). I will do my best explaining it with tables:
>>>
>>> My input with a size of (10TB) sits in multiple (~150) parquets on S3
>>>
>>> +---+--+---+--+---+
>>> |uri| link_list|lang_id|vector|content|
>>> +---+--+---+--+---+
>>> |www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
>>> |www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
>>> |www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|
>>>
>>>
>>> *(link_list is a  ArrayType(StringType()))*
>>>
>>> Step1 : I only load the uri and link_list columns (but they make up the
>>> bulk of the data). Then every uri is given a unique ID with 
>>> df.withColumn('uri_id',
>>> func.monotonically_increasing_id())
>>> resulting in a dataframe looking like this
>>>
>>> *DF_A*:
>>>
>>> +---+--+---+
>>> |uri| link_list| uri_id|
>>> +---+--+---+
>>> |www.123.com|[www.123.com,www.abc.com,]|  1|
>>> |www.abc.com|[www.opq.com,www.456.com,]|  2|
>>> |www.456.com|[www.xyz.com,www.abc.com,]|  3|
>>>
>>> Step 2: I create another dataframe containing only the uri and uri_id which 
>>> is renamed to link_id fields
>>>
>>> *DF_B*:
>>> +---++
>>> |uri| link_id|
>>> +---++
>>> |www.123.com|   1|
>>> |www.abc.com|   2|
>>> |www.456.com|   3|
>>>
>>> Step 3: Now I exploded the link_list field in *DF_A* with  
>>> *DF_A*.select("uri_id", func.explode("link_list").alias("link"))
>>> This gives me
>>>
>>> *DF_C*:
>>> +---+---+
>>> |   link| uri_id|
>>> +---+---+
>>> |www.123.com|  1|
>>> |www.abc.com|  1|
>>> |www.opq.com|  2|
>>> |www.456.com|  2|
>>> |www.xyz.com|  3|
>>> |www.abc.com|  3|
>>>
>>>
>>> Lastly I Join DF_B DF_C *DF_C*.join(*DF_B*, *DF_C*.link==*DF_B*.uri, 
>>> "left_outer").drop("uri") Which results in the final dataframe:
>>>
>>>
>>> +---+---++
>>> |   link| uri_id| link_id|
>>> +---+---++
>>> |www.123.com|  1|   1|
>>> |www.abc.com|  1|   2|
>>> |www.opq.com|  2|null|
>>> |www.456.com|  2|   3|
>>> |www.xyz.com|  3|null|
>>> |www.abc.com|  3|   1|
>>>
>>> (in code the field link is also dropped but this makes it hopefully more 
>>> intelligible this way)
>>>
>>>
>>> the rest is to just join the uri_id with the lang_id,vector,content rows 
>>> that are not null which is trivial.
>>>
>>> I hope this makes it more readable. If there is an aws service that makes 
>>> it easier for me to deal with the data, since it is basically "just" 
>>> database operations I'm also happy to hear about it.
>>> I got a few days on my hands until the preprocessing is done but I'm not 
>>> sure if the explod in step 3 can be done in another aws service.
>>>
>>> thanks!
>>>
>>>
>>> On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
 Hi,

 it would be much simple

Re: Custom Data Source for getting data from Rest based services

2017-11-27 Thread smazumder
@sathich

Here are my thoughts on your points -

1. Yes this should be able to handle any complex json structure returned by
the target rest API. Essentially what it would be returning is Rows of that
complex structure. Then one can use Spark SQL to further flatten it using
the functions like inline, explode, etc.

2. In my current implementation I have kept an option as "callStrictlyOnce".
This will ensure that the REST API is called only once for each set of
parameter values and the result would be persisted/cached for next time use.

3. I'm not sure what exactly you have in mind regarding extending this to
Spark Streaming. As such this cannot be used as a Spark Streaming receiver
right now as this does not implement the necessary interfaces for a custom
streaming receiver. But you can use this within your Spark Streaming
application as a regular Data Source to merge the data you are receiving
from streaming source.

Regards,
Sourav



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Custom Data Source for getting data from Rest based services

2017-11-27 Thread Sourav Mazumder
It would be great if you can elaborate on the bulk provisioning use case.

Regards,
Sourav

On Sun, Nov 26, 2017 at 11:53 PM, shankar.roy  wrote:

> This would be a useful feature.
> We can leverage it while doing bulk provisioning.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


How to kill a query job when using spark thrift-server?

2017-11-27 Thread 张万新
Hi,

I intend to use spark thrift-server as a service to support concurrent sql
queries. But in our situation we need a way to kill arbitrary query job, is
there an api to use here?