Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Sean, Thanks. It's a developer API and doesn't appear to be exposed. Ewan On 07/12/15 15:06, Sean Owen wrote: I'm not sure if this is available in Python but from 1.3 on you should be able to call ALS.setFinalRDDStorageLevel with level "none" to ask it to unpersist when it is done. On Mon, Dec 7, 2015 at 1:42 PM, Ewan Higgswrote: Jonathan, Did you ever get to the bottom of this? I have some users working with Spark in a classroom setting and our example notebooks run into problems where there is so much spilled to disk that they run out of quota. A 1.5G input set becomes >30G of spilled data on disk. I looked into how I could unpersist the data so I could clean up the files, but I was unsuccessful. We're using Spark 1.5.0 Yours, Ewan On 16/07/15 23:18, Stahlman, Jonathan wrote: Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ",".join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Jonathan, Did you ever get to the bottom of this? I have some users working with Spark in a classroom setting and our example notebooks run into problems where there is so much spilled to disk that they run out of quota. A 1.5G input set becomes >30G of spilled data on disk. I looked into how I could unpersist the data so I could clean up the files, but I was unsuccessful. We're using Spark 1.5.0 Yours, Ewan On 16/07/15 23:18, Stahlman, Jonathan wrote: Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ",".join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
I'm not sure if this is available in Python but from 1.3 on you should be able to call ALS.setFinalRDDStorageLevel with level "none" to ask it to unpersist when it is done. On Mon, Dec 7, 2015 at 1:42 PM, Ewan Higgswrote: > Jonathan, > Did you ever get to the bottom of this? I have some users working with Spark > in a classroom setting and our example notebooks run into problems where > there is so much spilled to disk that they run out of quota. A 1.5G input > set becomes >30G of spilled data on disk. I looked into how I could > unpersist the data so I could clean up the files, but I was unsuccessful. > > We're using Spark 1.5.0 > > Yours, > Ewan > > On 16/07/15 23:18, Stahlman, Jonathan wrote: > > Hello all, > > I am running the Spark recommendation algorithm in MLlib and I have been > studying its output with various model configurations. Ideally I would like > to be able to run one job that trains the recommendation model with many > different configurations to try to optimize for performance. A sample code > in python is copied below. > > The issue I have is that each new model which is trained caches a set of > RDDs and eventually the executors run out of memory. Is there any way in > Pyspark to unpersist() these RDDs after each iteration? The names of the > RDDs which I gather from the UI is: > > itemInBlocks > itemOutBlocks > Products > ratingBlocks > userInBlocks > userOutBlocks > users > > I am using Spark 1.3. Thank you for any help! > > Regards, > Jonathan > > > > > data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) > functions = [rating] #defined elsewhere > ranks = [10,20] > iterations = [10,20] > lambdas = [0.01,0.1] > alphas = [1.0,50.0] > > results = [] > for ratingFunction, rank, numIterations, m_lambda, m_alpha in > itertools.product( functions, ranks, iterations, lambdas, alphas ): > #train model > ratings_train = data_train.map(lambda l: Rating( l.user, l.product, > ratingFunction(l) ) ) > model = ALS.trainImplicit( ratings_train, rank, numIterations, > lambda_=float(m_lambda), alpha=float(m_alpha) ) > > #test performance on CV data > ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, > ratingFunction(l) ) ) > auc = areaUnderCurve( ratings_cv, model.predictAll ) > > #save results > result = ",".join(str(l) for l in > [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) > results.append(result) > > > > The information contained in this e-mail is confidential and/or proprietary > to Capital One and/or its affiliates and may only be used solely in > performance of work or services for Capital One. The information transmitted > herewith is intended only for use by the individual or entity to which it is > addressed. If the reader of this message is not the intended recipient, you > are hereby notified that any review, retransmission, dissemination, > distribution, copying or other use of, or taking of any action in reliance > upon this information is strictly prohibited. If you have received this > communication in error, please contact the sender and delete the material > from your computer. > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hi Stahlman, finalRDDStorageLevel is the storage level for the final user/item factors. It is not common to set it to StorageLevel.NONE, unless you want to save the factors directly to disk. So if it is NONE, we cannot unpersist the intermediate RDDs (in/out blocks) because the final user/item factors returned are not materialized. Otherwise, we have to recompute from the very beginning (or last checkpoint) when you materialize the final user/item factors. If you need want to have multiple runs, you can try to set finalRDDStorageLevel to MEMORY_AND_DISK, or clean previous runs so the cached RDDs get garbage collected. Best, Xiangrui On Wed, Jul 22, 2015 at 11:35 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: To be Unpersisted the RDD must be persisted first. If it's set to None, then it's not persisted, and as such does not need to be freed. Does that make sense ? Thank you, Ilya Ganelin -Original Message- From: Stahlman, Jonathan [jonathan.stahl...@capitalone.com] Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time To: user@spark.apache.org Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.org user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hi Burak, Looking at the source code, the intermediate RDDs used in ALS.train() are persisted during the computation using intermediateRDDStorageLevel (default value is StorageLevel.MEMORY_AND_DISK) - see herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L546, herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L548, and herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L556. At the end of the ALS calculation, these RDDs are no longer needed nor returned, so I would assume the logical choice would be to unpersist() these RDDs. The strategy in the code seems to be set by finalRDDStorageLevel, which for some reason only calls unpersist() on the intermediate RDDs if finalRDDStorageLevel != StorageLevel.NONE, which seems counter-intuitive to me. Jonathan From: Burak Yavuz brk...@gmail.commailto:brk...@gmail.com Date: Wednesday, July 22, 2015 at 10:47 AM To: Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hi Jonathan, I believe calling persist with StorageLevel.NONE doesn't do anything. That's why the unpersist has an if statement before it. Could you give more information about your setup please? Number of cores, memory, number of partitions of ratings_train? Thanks, Burak On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com wrote: Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hi Jonathan, I believe calling persist with StorageLevel.NONE doesn't do anything. That's why the unpersist has an if statement before it. Could you give more information about your setup please? Number of cores, memory, number of partitions of ratings_train? Thanks, Burak On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan jonathan.stahl...@capitalone.com wrote: Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.org user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
To be Unpersisted the RDD must be persisted first. If it's set to None, then it's not persisted, and as such does not need to be freed. Does that make sense ? Thank you, Ilya Ganelin -Original Message- From: Stahlman, Jonathan [jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com] Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time To: user@spark.apache.org Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.