Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything.
That's why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores,
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan <
jonathan.stahl...@capitalone.com> wrote:

> Hello again,
>
> In trying to understand the caching of intermediate RDDs by ALS, I looked
> into the source code and found what may be a bug.  Looking here:
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230
>
> you see that ALS.train() is being called with finalRDDStorageLevel =
> StorageLevel.NONE, which I would understand to mean that the intermediate
> RDDs will not be persisted.  Looking here:
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631
>
> unpersist() is only being called on the intermediate RDDs (all the *Blocks
> RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.
>
>
> This doesn’t make sense to me – I would expect the RDDs to be removed from
> the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
> around.
>
> Jonathan
>
>
> From: <Stahlman>, Stahlman Jonathan <jonathan.stahl...@capitalone.com>
> Date: Thursday, July 16, 2015 at 2:18 PM
> To: "user@spark.apache.org" <user@spark.apache.org>
> Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
>
> Hello all,
>
> I am running the Spark recommendation algorithm in MLlib and I have been
> studying its output with various model configurations.  Ideally I would
> like to be able to run one job that trains the recommendation model with
> many different configurations to try to optimize for performance.  A sample
> code in python is copied below.
>
> The issue I have is that each new model which is trained caches a set of
> RDDs and eventually the executors run out of memory.  Is there any way in
> Pyspark to unpersist() these RDDs after each iteration?  The names of the
> RDDs which I gather from the UI is:
>
> itemInBlocks
> itemOutBlocks
> Products
> ratingBlocks
> userInBlocks
> userOutBlocks
> users
>
> I am using Spark 1.3.  Thank you for any help!
>
> Regards,
> Jonathan
>
>
>
>
>   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
>   functions = [rating] #defined elsewhere
>   ranks = [10,20]
>   iterations = [10,20]
>   lambdas = [0.01,0.1]
>   alphas  = [1.0,50.0]
>
>   results = []
>   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
> itertools.product( functions, ranks, iterations, lambdas, alphas ):
>     #train model
>     ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
> ratingFunction(l) ) )
>     model   = ALS.trainImplicit( ratings_train, rank, numIterations,
> lambda_=float(m_lambda), alpha=float(m_alpha) )
>
>     #test performance on CV data
>     ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
> ratingFunction(l) ) )
>     auc = areaUnderCurve( ratings_cv, model.predictAll )
>
>     #save results
>     result = ",".join(str(l) for l in
> [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
>     results.append(result)
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Reply via email to