Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-12-08 Thread Ewan Higgs

Sean,

Thanks.
It's a developer API and doesn't appear to be exposed.

Ewan

On 07/12/15 15:06, Sean Owen wrote:

I'm not sure if this is available in Python but from 1.3 on you should
be able to call ALS.setFinalRDDStorageLevel with level "none" to ask
it to unpersist when it is done.

On Mon, Dec 7, 2015 at 1:42 PM, Ewan Higgs  wrote:

Jonathan,
Did you ever get to the bottom of this? I have some users working with Spark
in a classroom setting and our example notebooks run into problems where
there is so much spilled to disk that they run out of quota. A 1.5G input
set becomes >30G of spilled data on disk. I looked into how I could
unpersist the data so I could clean up the files, but I was unsuccessful.

We're using Spark 1.5.0

Yours,
Ewan

On 16/07/15 23:18, Stahlman, Jonathan wrote:

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been
studying its output with various model configurations.  Ideally I would like
to be able to run one job that trains the recommendation model with many
different configurations to try to optimize for performance.  A sample code
in python is copied below.

The issue I have is that each new model which is trained caches a set of
RDDs and eventually the executors run out of memory.  Is there any way in
Pyspark to unpersist() these RDDs after each iteration?  The names of the
RDDs which I gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
   functions = [rating] #defined elsewhere
   ranks = [10,20]
   iterations = [10,20]
   lambdas = [0.01,0.1]
   alphas  = [1.0,50.0]

   results = []
   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
itertools.product( functions, ranks, iterations, lambdas, alphas ):
 #train model
 ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
ratingFunction(l) ) )
 model   = ALS.trainImplicit( ratings_train, rank, numIterations,
lambda_=float(m_lambda), alpha=float(m_alpha) )

 #test performance on CV data
 ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
ratingFunction(l) ) )
 auc = areaUnderCurve( ratings_cv, model.predictAll )

 #save results
 result = ",".join(str(l) for l in
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
 results.append(result)



The information contained in this e-mail is confidential and/or proprietary
to Capital One and/or its affiliates and may only be used solely in
performance of work or services for Capital One. The information transmitted
herewith is intended only for use by the individual or entity to which it is
addressed. If the reader of this message is not the intended recipient, you
are hereby notified that any review, retransmission, dissemination,
distribution, copying or other use of, or taking of any action in reliance
upon this information is strictly prohibited. If you have received this
communication in error, please contact the sender and delete the material
from your computer.





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-12-07 Thread Ewan Higgs

Jonathan,
Did you ever get to the bottom of this? I have some users working with 
Spark in a classroom setting and our example notebooks run into problems 
where there is so much spilled to disk that they run out of quota. A 
1.5G input set becomes >30G of spilled data on disk. I looked into how I 
could unpersist the data so I could clean up the files, but I was 
unsuccessful.


We're using Spark 1.5.0

Yours,
Ewan

On 16/07/15 23:18, Stahlman, Jonathan wrote:

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have 
been studying its output with various model configurations.  Ideally I 
would like to be able to run one job that trains the recommendation 
model with many different configurations to try to optimize for 
performance.  A sample code in python is copied below.


The issue I have is that each new model which is trained caches a set 
of RDDs and eventually the executors run out of memory.  Is there any 
way in Pyspark to unpersist() these RDDs after each iteration?  The 
names of the RDDs which I gather from the UI is:


itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):

#train model
ratings_train = data_train.map(lambda l: Rating( l.user, 
l.product, ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )


#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )

auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ",".join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])

results.append(result)



The information contained in this e-mail is confidential and/or 
proprietary to Capital One and/or its affiliates and may only be used 
solely in performance of work or services for Capital One. The 
information transmitted herewith is intended only for use by the 
individual or entity to which it is addressed. If the reader of this 
message is not the intended recipient, you are hereby notified that 
any review, retransmission, dissemination, distribution, copying or 
other use of, or taking of any action in reliance upon this 
information is strictly prohibited. If you have received this 
communication in error, please contact the sender and delete the 
material from your computer.






Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-12-07 Thread Sean Owen
I'm not sure if this is available in Python but from 1.3 on you should
be able to call ALS.setFinalRDDStorageLevel with level "none" to ask
it to unpersist when it is done.

On Mon, Dec 7, 2015 at 1:42 PM, Ewan Higgs  wrote:
> Jonathan,
> Did you ever get to the bottom of this? I have some users working with Spark
> in a classroom setting and our example notebooks run into problems where
> there is so much spilled to disk that they run out of quota. A 1.5G input
> set becomes >30G of spilled data on disk. I looked into how I could
> unpersist the data so I could clean up the files, but I was unsuccessful.
>
> We're using Spark 1.5.0
>
> Yours,
> Ewan
>
> On 16/07/15 23:18, Stahlman, Jonathan wrote:
>
> Hello all,
>
> I am running the Spark recommendation algorithm in MLlib and I have been
> studying its output with various model configurations.  Ideally I would like
> to be able to run one job that trains the recommendation model with many
> different configurations to try to optimize for performance.  A sample code
> in python is copied below.
>
> The issue I have is that each new model which is trained caches a set of
> RDDs and eventually the executors run out of memory.  Is there any way in
> Pyspark to unpersist() these RDDs after each iteration?  The names of the
> RDDs which I gather from the UI is:
>
> itemInBlocks
> itemOutBlocks
> Products
> ratingBlocks
> userInBlocks
> userOutBlocks
> users
>
> I am using Spark 1.3.  Thank you for any help!
>
> Regards,
> Jonathan
>
>
>
>
>   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
>   functions = [rating] #defined elsewhere
>   ranks = [10,20]
>   iterations = [10,20]
>   lambdas = [0.01,0.1]
>   alphas  = [1.0,50.0]
>
>   results = []
>   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
> itertools.product( functions, ranks, iterations, lambdas, alphas ):
> #train model
> ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
> ratingFunction(l) ) )
> model   = ALS.trainImplicit( ratings_train, rank, numIterations,
> lambda_=float(m_lambda), alpha=float(m_alpha) )
>
> #test performance on CV data
> ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
> ratingFunction(l) ) )
> auc = areaUnderCurve( ratings_cv, model.predictAll )
>
> #save results
> result = ",".join(str(l) for l in
> [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
> results.append(result)
>
> 
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-28 Thread Xiangrui Meng
Hi Stahlman,

finalRDDStorageLevel is the storage level for the final user/item
factors. It is not common to set it to StorageLevel.NONE, unless you
want to save the factors directly to disk. So if it is NONE, we cannot
unpersist the intermediate RDDs (in/out blocks) because the final
user/item factors returned are not materialized. Otherwise, we have to
recompute from the very beginning (or last checkpoint) when you
materialize the final user/item factors. If you need want to have
multiple runs, you can try to set finalRDDStorageLevel to
MEMORY_AND_DISK, or clean previous runs so the cached RDDs get garbage
collected.

Best,
Xiangrui

On Wed, Jul 22, 2015 at 11:35 AM, Ganelin, Ilya
ilya.gane...@capitalone.com wrote:
 To be Unpersisted the RDD must be persisted first. If it's set to None, then
 it's not persisted, and as such does not need to be freed. Does that make
 sense ?



 Thank you,
 Ilya Ganelin




 -Original Message-
 From: Stahlman, Jonathan [jonathan.stahl...@capitalone.com]
 Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
 To: user@spark.apache.org
 Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

 Hello again,

 In trying to understand the caching of intermediate RDDs by ALS, I looked
 into the source code and found what may be a bug.  Looking here:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

 you see that ALS.train() is being called with finalRDDStorageLevel =
 StorageLevel.NONE, which I would understand to mean that the intermediate
 RDDs will not be persisted.  Looking here:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

 unpersist() is only being called on the intermediate RDDs (all the *Blocks
 RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

 This doesn’t make sense to me – I would expect the RDDs to be removed from
 the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
 around.

 Jonathan


 From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com
 Date: Thursday, July 16, 2015 at 2:18 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

 Hello all,

 I am running the Spark recommendation algorithm in MLlib and I have been
 studying its output with various model configurations.  Ideally I would like
 to be able to run one job that trains the recommendation model with many
 different configurations to try to optimize for performance.  A sample code
 in python is copied below.

 The issue I have is that each new model which is trained caches a set of
 RDDs and eventually the executors run out of memory.  Is there any way in
 Pyspark to unpersist() these RDDs after each iteration?  The names of the
 RDDs which I gather from the UI is:

 itemInBlocks
 itemOutBlocks
 Products
 ratingBlocks
 userInBlocks
 userOutBlocks
 users

 I am using Spark 1.3.  Thank you for any help!

 Regards,
 Jonathan




   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
   functions = [rating] #defined elsewhere
   ranks = [10,20]
   iterations = [10,20]
   lambdas = [0.01,0.1]
   alphas  = [1.0,50.0]

   results = []
   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
 itertools.product( functions, ranks, iterations, lambdas, alphas ):
 #train model
 ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
 ratingFunction(l) ) )
 model   = ALS.trainImplicit( ratings_train, rank, numIterations,
 lambda_=float(m_lambda), alpha=float(m_alpha) )

 #test performance on CV data
 ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
 ratingFunction(l) ) )
 auc = areaUnderCurve( ratings_cv, model.predictAll )

 #save results
 result = ,.join(str(l) for l in
 [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
 results.append(result)

 

 The information contained in this e-mail is confidential and/or proprietary
 to Capital One and/or its affiliates and may only be used solely in
 performance of work or services for Capital One. The information transmitted
 herewith is intended only for use by the individual or entity to which it is
 addressed. If the reader of this message is not the intended recipient, you
 are hereby notified that any review, retransmission, dissemination,
 distribution, copying or other use of, or taking of any action in reliance
 upon this information is strictly prohibited. If you have received this
 communication in error, please contact the sender and delete the material
 from your computer.


 

 The information contained in this e-mail is confidential and/or proprietary
 to Capital One and/or its affiliates and may only be used solely in
 performance of work or services for Capital One. The information

Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Stahlman, Jonathan
Hi Burak,

Looking at the source code, the intermediate RDDs used in ALS.train() are 
persisted during the computation using intermediateRDDStorageLevel (default 
value is StorageLevel.MEMORY_AND_DISK) - see 
herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L546,
 
herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L548,
 and 
herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L556.
  At the end of the ALS calculation, these RDDs are no longer needed nor 
returned, so I would assume the logical choice would be to unpersist() these 
RDDs.  The strategy in the code seems to be set by finalRDDStorageLevel, which 
for some reason only calls unpersist() on the intermediate RDDs if  
finalRDDStorageLevel != StorageLevel.NONE, which seems counter-intuitive to me.

Jonathan

From: Burak Yavuz brk...@gmail.commailto:brk...@gmail.com
Date: Wednesday, July 22, 2015 at 10:47 AM
To: Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything. That's 
why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores, 
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com 
wrote:
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: Stahlman, Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Date: Thursday, July 16, 2015 at 2:18 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed

Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Burak Yavuz
Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything.
That's why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores,
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan 
jonathan.stahl...@capitalone.com wrote:

 Hello again,

 In trying to understand the caching of intermediate RDDs by ALS, I looked
 into the source code and found what may be a bug.  Looking here:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

 you see that ALS.train() is being called with finalRDDStorageLevel =
 StorageLevel.NONE, which I would understand to mean that the intermediate
 RDDs will not be persisted.  Looking here:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

 unpersist() is only being called on the intermediate RDDs (all the *Blocks
 RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.


 This doesn’t make sense to me – I would expect the RDDs to be removed from
 the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
 around.

 Jonathan


 From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com
 Date: Thursday, July 16, 2015 at 2:18 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

 Hello all,

 I am running the Spark recommendation algorithm in MLlib and I have been
 studying its output with various model configurations.  Ideally I would
 like to be able to run one job that trains the recommendation model with
 many different configurations to try to optimize for performance.  A sample
 code in python is copied below.

 The issue I have is that each new model which is trained caches a set of
 RDDs and eventually the executors run out of memory.  Is there any way in
 Pyspark to unpersist() these RDDs after each iteration?  The names of the
 RDDs which I gather from the UI is:

 itemInBlocks
 itemOutBlocks
 Products
 ratingBlocks
 userInBlocks
 userOutBlocks
 users

 I am using Spark 1.3.  Thank you for any help!

 Regards,
 Jonathan




   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
   functions = [rating] #defined elsewhere
   ranks = [10,20]
   iterations = [10,20]
   lambdas = [0.01,0.1]
   alphas  = [1.0,50.0]

   results = []
   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
 itertools.product( functions, ranks, iterations, lambdas, alphas ):
 #train model
 ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
 ratingFunction(l) ) )
 model   = ALS.trainImplicit( ratings_train, rank, numIterations,
 lambda_=float(m_lambda), alpha=float(m_alpha) )

 #test performance on CV data
 ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
 ratingFunction(l) ) )
 auc = areaUnderCurve( ratings_cv, model.predictAll )

 #save results
 result = ,.join(str(l) for l in
 [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
 results.append(result)

 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. If you
 have received this communication in error, please contact the sender and
 delete the material from your computer.



RE: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Ganelin, Ilya
To be Unpersisted the RDD must be persisted first. If it's set to None, then 
it's not persisted, and as such does not need to be freed. Does that make sense 
?



Thank you,
Ilya Ganelin



-Original Message-
From: Stahlman, Jonathan 
[jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com]
Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: Stahlman, Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Date: Thursday, July 16, 2015 at 2:18 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Stahlman, Jonathan
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: Stahlman, Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Date: Thursday, July 16, 2015 at 2:18 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.