Spark structured streaming with periodical persist and unpersist
I am currently building a spark structured streaming application where I am doing a batch-stream join. And the source for the batch data gets updated periodically. So, I am planning to do a persist/unpersist of that batch data periodically. Below is a sample code which I am using to persist and unpersist the batch data. Flow: -> Read the batch data -> persist the batch data -> For every one hour, unpersist the data and read the batch data and persist it again. But, I am not seeing the batch data getting refreshed for every hour. Code: var batchDF = handler.readBatchDF(sparkSession) batchDF.persist(StorageLevel.MEMORY_AND_DISK) var refreshedTime: Instant = Instant.now() if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) { refreshedTime = Instant.now() batchDF.unpersist(false) batchDF = handler.readBatchDF(sparkSession) .persist(StorageLevel.MEMORY_AND_DISK) } Is there any better way to achieve this scenario in spark structured streaming jobs ? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Broadcast variables: destroy/unpersist unexpected behaviour
I experienced the below two cases when unpersisting or destroying broadcast variables in pyspark. But the same works good in spark scala shell. Any clue why this happens ? Is it a bug in pyspark? ***Case 1:*** >>> b1 = sc.broadcast([1,2,3]) >>> b1.value [1, 2, 3] >>> b1.destroy() >>> b1.value [1, 2, 3] I can still access the value in driver. ***Case 2:*** >>> b = sc.broadcast([1,2,3]) >>> b.destroy() >>> b.value Traceback (most recent call last): File "", line 1, in File "/home/sdh/Downloads/spark-2.2.1-bin-hadoop2.7/python/pyspark/broadcast.py", line 109, in value self._value = self.load(self._path) File "/home/sdh/Downloads/spark-2.2.1-bin-hadoop2.7/python/pyspark/broadcast.py", line 95, in load with open(path, 'rb', 1 << 20) as f: IOError: [Errno 2] No such file or directory: u'/tmp/spark-eef352c0-6470-4b89-999f-923493a27bc4/pyspark-17d3a9a3-b5c1-4331-b408-8447f078789e/tmpzq4kv0' Rather i should get a message something similar to "Attempted to use broadcast variable after it was destroyed" -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unpersist all from memory in spark 2.2
Is there a way to unpersist all data frames, data sets, and/or RDD in Spark 2.2 in a single call? Thanks -- Cesar Flores
Re: Unpersist RDD in Graphx
Hi, Please call "Graph#unpersist" that releases two RDDs, vertex and edge ones. "Graph#unpersist" just invokes "Graph#unpersistVertices" and "Graph#edges#unpersist"; "Graph#unpersistVertices" releases memory for vertices and "Graph#edges#unpersist" does memory for edges. If blocking = true, unpersist() waits until memory released from BlockManager. On Mon, Feb 1, 2016 at 8:35 AM, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote: > Hi, What is he best way to unpersist the RDD in graphx to release memory? > RDD.unpersist > or > RDD.unpersistVertices and RDD..edges.unpersist > > I study the source code of Pregel.scala, Both of above were used between > line 148 and line 150. Can anyone please tell me what the different? In > addition, what is the difference to set blocking = false and blocking = > true? > > oldMessages.unpersist(blocking = false) > prevG.unpersistVertices(blocking = false) > prevG.edges.unpersist(blocking = false) > > Thanks, > > Jingyu > > This message and its attachments may contain legally privileged or > confidential information. It is intended solely for the named addressee. If > you are not the addressee indicated in this message or responsible for > delivery of the message to the addressee, you may not copy or deliver this > message or its attachments to anyone. Rather, you should permanently delete > this message and its attachments and kindly notify the sender by reply > e-mail. Any content of this message and its attachments which does not > relate to the official business of the sending company must be taken not to > have been sent or endorsed by that company or any of its related entities. > No warranty is made that the e-mail or attachments are free from computer > virus or other defect. -- --- Takeshi Yamamuro
Unpersist RDD in Graphx
Hi, What is he best way to unpersist the RDD in graphx to release memory? RDD.unpersist or RDD.unpersistVertices and RDD..edges.unpersist I study the source code of Pregel.scala, Both of above were used between line 148 and line 150. Can anyone please tell me what the different? In addition, what is the difference to set blocking = false and blocking = true? oldMessages.unpersist(blocking = false) prevG.unpersistVertices(blocking = false) prevG.edges.unpersist(blocking = false) Thanks, Jingyu -- This message and its attachments may contain legally privileged or confidential information. It is intended solely for the named addressee. If you are not the addressee indicated in this message or responsible for delivery of the message to the addressee, you may not copy or deliver this message or its attachments to anyone. Rather, you should permanently delete this message and its attachments and kindly notify the sender by reply e-mail. Any content of this message and its attachments which does not relate to the official business of the sending company must be taken not to have been sent or endorsed by that company or any of its related entities. No warranty is made that the e-mail or attachments are free from computer virus or other defect.
Re: automatically unpersist RDDs which are not used for 24 hours?
Hi Alex, Yes, you can set `spark.cleaner.ttl`: http://spark.apache.org/docs/1.6.0/configuration.html, but I would not recommend it! We are actually removing this property in Spark 2.0 because it has caused problems for many users in the past. In particular, if you accidentally use a variable that has been automatically cleaned, then you will run into problems like shuffle fetch failures or broadcast variable not found etc, which may fail your job. Alternatively, Spark already automatically cleans up all variables that have been garbage collected, including RDDs, shuffle dependencies, broadcast variables and accumulators. This context-based cleaning has been enabled by default for many versions by now so it should be reliable. The only caveat is that it may not work super well in a shell environment, where some variables may never exit the scope. Please let me know if you have more questions, -Andrew 2016-01-13 11:36 GMT-08:00 Alexander Pivovarov <apivova...@gmail.com>: > Is it possible to automatically unpersist RDDs which are not used for 24 > hours? >
automatically unpersist RDDs which are not used for 24 hours?
Is it possible to automatically unpersist RDDs which are not used for 24 hours?
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Sean, Thanks. It's a developer API and doesn't appear to be exposed. Ewan On 07/12/15 15:06, Sean Owen wrote: I'm not sure if this is available in Python but from 1.3 on you should be able to call ALS.setFinalRDDStorageLevel with level "none" to ask it to unpersist when it is done. On Mon, Dec 7, 2015 at 1:42 PM, Ewan Higgs <ewan.hi...@ugent.be> wrote: Jonathan, Did you ever get to the bottom of this? I have some users working with Spark in a classroom setting and our example notebooks run into problems where there is so much spilled to disk that they run out of quota. A 1.5G input set becomes >30G of spilled data on disk. I looked into how I could unpersist the data so I could clean up the files, but I was unsuccessful. We're using Spark 1.5.0 Yours, Ewan On 16/07/15 23:18, Stahlman, Jonathan wrote: Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ",".join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Jonathan, Did you ever get to the bottom of this? I have some users working with Spark in a classroom setting and our example notebooks run into problems where there is so much spilled to disk that they run out of quota. A 1.5G input set becomes >30G of spilled data on disk. I looked into how I could unpersist the data so I could clean up the files, but I was unsuccessful. We're using Spark 1.5.0 Yours, Ewan On 16/07/15 23:18, Stahlman, Jonathan wrote: Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ",".join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
I'm not sure if this is available in Python but from 1.3 on you should be able to call ALS.setFinalRDDStorageLevel with level "none" to ask it to unpersist when it is done. On Mon, Dec 7, 2015 at 1:42 PM, Ewan Higgs <ewan.hi...@ugent.be> wrote: > Jonathan, > Did you ever get to the bottom of this? I have some users working with Spark > in a classroom setting and our example notebooks run into problems where > there is so much spilled to disk that they run out of quota. A 1.5G input > set becomes >30G of spilled data on disk. I looked into how I could > unpersist the data so I could clean up the files, but I was unsuccessful. > > We're using Spark 1.5.0 > > Yours, > Ewan > > On 16/07/15 23:18, Stahlman, Jonathan wrote: > > Hello all, > > I am running the Spark recommendation algorithm in MLlib and I have been > studying its output with various model configurations. Ideally I would like > to be able to run one job that trains the recommendation model with many > different configurations to try to optimize for performance. A sample code > in python is copied below. > > The issue I have is that each new model which is trained caches a set of > RDDs and eventually the executors run out of memory. Is there any way in > Pyspark to unpersist() these RDDs after each iteration? The names of the > RDDs which I gather from the UI is: > > itemInBlocks > itemOutBlocks > Products > ratingBlocks > userInBlocks > userOutBlocks > users > > I am using Spark 1.3. Thank you for any help! > > Regards, > Jonathan > > > > > data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) > functions = [rating] #defined elsewhere > ranks = [10,20] > iterations = [10,20] > lambdas = [0.01,0.1] > alphas = [1.0,50.0] > > results = [] > for ratingFunction, rank, numIterations, m_lambda, m_alpha in > itertools.product( functions, ranks, iterations, lambdas, alphas ): > #train model > ratings_train = data_train.map(lambda l: Rating( l.user, l.product, > ratingFunction(l) ) ) > model = ALS.trainImplicit( ratings_train, rank, numIterations, > lambda_=float(m_lambda), alpha=float(m_alpha) ) > > #test performance on CV data > ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, > ratingFunction(l) ) ) > auc = areaUnderCurve( ratings_cv, model.predictAll ) > > #save results > result = ",".join(str(l) for l in > [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) > results.append(result) > > > > The information contained in this e-mail is confidential and/or proprietary > to Capital One and/or its affiliates and may only be used solely in > performance of work or services for Capital One. The information transmitted > herewith is intended only for use by the individual or entity to which it is > addressed. If the reader of this message is not the intended recipient, you > are hereby notified that any review, retransmission, dissemination, > distribution, copying or other use of, or taking of any action in reliance > upon this information is strictly prohibited. If you have received this > communication in error, please contact the sender and delete the material > from your computer. > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to unpersist a DStream in Spark Streaming
Do we have any guarantees on the maximum duration? I've seen RDDs kept around for 7-10 minutes on batches of 20 secs and checkpoint of 100 secs. No windows, just updateStateByKey. t's not a memory issue but on checkpoint recovery it goes back to Kafka for 10 minutes of data, any idea why? -adrian Sent from my iPhone On 06 Nov 2015, at 09:45, Tathagata Das <t...@databricks.com<mailto:t...@databricks.com>> wrote: Spark streaming automatically takes care of unpersisting any RDDs generated by DStream. You can set the StreamingContext.remember() to set the minimum persistence duration. Any persisted RDD older than that will be automatically unpersisted On Thu, Nov 5, 2015 at 9:12 AM, swetha kasireddy <swethakasire...@gmail.com<mailto:swethakasire...@gmail.com>> wrote: Its just in the same thread for a particular RDD, I need to uncache it every 2 minutes to clear out the data that is present in a Map inside that. On Wed, Nov 4, 2015 at 11:54 PM, Saisai Shao <sai.sai.s...@gmail.com<mailto:sai.sai.s...@gmail.com>> wrote: Hi Swetha, Would you mind elaborating your usage scenario of DStream unpersisting? >From my understanding: 1. Spark Streaming will automatically unpersist outdated data (you already mentioned about the configurations). 2. If streaming job is started, I think you may lose the control of the job, when do you call this unpersist, how to call this unpersist (from another thread)? Thanks Saisai On Thu, Nov 5, 2015 at 3:13 PM, swetha kasireddy <swethakasire...@gmail.com<mailto:swethakasire...@gmail.com>> wrote: Other than setting the following. sparkConf.set("spark.streaming.unpersist", "true") sparkConf.set("spark.cleaner.ttl", "7200s") On Wed, Nov 4, 2015 at 5:03 PM, swetha <swethakasire...@gmail.com<mailto:swethakasire...@gmail.com>> wrote: Hi, How to unpersist a DStream in Spark Streaming? I know that we can persist using dStream.persist() or dStream.cache. But, I don't see any method to unPersist. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html Sent from the Apache Spark User List mailing list archive at Nabble.com<http://nabble.com>. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
Re: How to unpersist a DStream in Spark Streaming
Its just in the same thread for a particular RDD, I need to uncache it every 2 minutes to clear out the data that is present in a Map inside that. On Wed, Nov 4, 2015 at 11:54 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote: > Hi Swetha, > > Would you mind elaborating your usage scenario of DStream unpersisting? > > From my understanding: > > 1. Spark Streaming will automatically unpersist outdated data (you already > mentioned about the configurations). > 2. If streaming job is started, I think you may lose the control of the > job, when do you call this unpersist, how to call this unpersist (from > another thread)? > > Thanks > Saisai > > > On Thu, Nov 5, 2015 at 3:13 PM, swetha kasireddy < > swethakasire...@gmail.com> wrote: > >> Other than setting the following. >> >> sparkConf.set("spark.streaming.unpersist", "true") >> sparkConf.set("spark.cleaner.ttl", "7200s") >> >> >> On Wed, Nov 4, 2015 at 5:03 PM, swetha <swethakasire...@gmail.com> wrote: >> >>> Hi, >>> >>> How to unpersist a DStream in Spark Streaming? I know that we can persist >>> using dStream.persist() or dStream.cache. But, I don't see any method to >>> unPersist. >>> >>> Thanks, >>> Swetha >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >
Re: How to unpersist a DStream in Spark Streaming
Spark streaming automatically takes care of unpersisting any RDDs generated by DStream. You can set the StreamingContext.remember() to set the minimum persistence duration. Any persisted RDD older than that will be automatically unpersisted On Thu, Nov 5, 2015 at 9:12 AM, swetha kasireddy <swethakasire...@gmail.com> wrote: > Its just in the same thread for a particular RDD, I need to uncache it > every 2 minutes to clear out the data that is present in a Map inside that. > > On Wed, Nov 4, 2015 at 11:54 PM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> Hi Swetha, >> >> Would you mind elaborating your usage scenario of DStream unpersisting? >> >> From my understanding: >> >> 1. Spark Streaming will automatically unpersist outdated data (you >> already mentioned about the configurations). >> 2. If streaming job is started, I think you may lose the control of the >> job, when do you call this unpersist, how to call this unpersist (from >> another thread)? >> >> Thanks >> Saisai >> >> >> On Thu, Nov 5, 2015 at 3:13 PM, swetha kasireddy < >> swethakasire...@gmail.com> wrote: >> >>> Other than setting the following. >>> >>> sparkConf.set("spark.streaming.unpersist", "true") >>> sparkConf.set("spark.cleaner.ttl", "7200s") >>> >>> >>> On Wed, Nov 4, 2015 at 5:03 PM, swetha <swethakasire...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> How to unpersist a DStream in Spark Streaming? I know that we can >>>> persist >>>> using dStream.persist() or dStream.cache. But, I don't see any method to >>>> unPersist. >>>> >>>> Thanks, >>>> Swetha >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >
How to unpersist a DStream in Spark Streaming
Hi, How to unpersist a DStream in Spark Streaming? I know that we can persist using dStream.persist() or dStream.cache. But, I don't see any method to unPersist. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to unpersist a DStream in Spark Streaming
Hi Swetha, Would you mind elaborating your usage scenario of DStream unpersisting? >From my understanding: 1. Spark Streaming will automatically unpersist outdated data (you already mentioned about the configurations). 2. If streaming job is started, I think you may lose the control of the job, when do you call this unpersist, how to call this unpersist (from another thread)? Thanks Saisai On Thu, Nov 5, 2015 at 3:13 PM, swetha kasireddy <swethakasire...@gmail.com> wrote: > Other than setting the following. > > sparkConf.set("spark.streaming.unpersist", "true") > sparkConf.set("spark.cleaner.ttl", "7200s") > > > On Wed, Nov 4, 2015 at 5:03 PM, swetha <swethakasire...@gmail.com> wrote: > >> Hi, >> >> How to unpersist a DStream in Spark Streaming? I know that we can persist >> using dStream.persist() or dStream.cache. But, I don't see any method to >> unPersist. >> >> Thanks, >> Swetha >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: How to unpersist a DStream in Spark Streaming
Hi, DStream->Discretized Streams are made up of multiple RDDs You can unpersist each RDD by accessing the individual RDD's using dstreamrdd.foreachRDD { rdd.unpersist(). } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281p25284.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to unpersist a DStream in Spark Streaming
Other than setting the following. sparkConf.set("spark.streaming.unpersist", "true") sparkConf.set("spark.cleaner.ttl", "7200s") On Wed, Nov 4, 2015 at 5:03 PM, swetha <swethakasire...@gmail.com> wrote: > Hi, > > How to unpersist a DStream in Spark Streaming? I know that we can persist > using dStream.persist() or dStream.cache. But, I don't see any method to > unPersist. > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
unpersist RDD from another thread
Hi, What is the behavior when calling rdd.unpersist() from a different thread while another thread is using that rdd. Below is a simple case for this: 1) create rdd and load data 2) call rdd.cache() to bring data into memory 3) create another thread and pass rdd for a long computation 4) call rdd.unpersist while 3. is still running Questions: * Will the computation in 3) finish properly even if unpersist was called on the rdd while running? * What happens if a part of the computation fails and the rdd needs to reconstruct based on DAG lineage, will this still work even though unpersist has been called? thanks, -paul
Re: unpersist RDD from another thread
So in order to not incur any performance issues I should really wait for all usage of the rdd to complete before calling unpersist, correct? On Wed, Sep 16, 2015 at 4:08 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > unpredictable. I think it will be safe (as in nothing should fail), but > the performance will be unpredictable (some partition may use cache, some > may not be able to use the cache). > > On Wed, Sep 16, 2015 at 1:06 PM, Paul Weiss <paulweiss@gmail.com> > wrote: > >> Hi, >> >> What is the behavior when calling rdd.unpersist() from a different thread >> while another thread is using that rdd. Below is a simple case for this: >> >> 1) create rdd and load data >> 2) call rdd.cache() to bring data into memory >> 3) create another thread and pass rdd for a long computation >> 4) call rdd.unpersist while 3. is still running >> >> Questions: >> >> * Will the computation in 3) finish properly even if unpersist was called >> on the rdd while running? >> * What happens if a part of the computation fails and the rdd needs to >> reconstruct based on DAG lineage, will this still work even though >> unpersist has been called? >> >> thanks, >> -paul >> > >
Re: unpersist RDD from another thread
Yes. On Wed, Sep 16, 2015 at 1:12 PM, Paul Weiss <paulweiss@gmail.com> wrote: > So in order to not incur any performance issues I should really wait for > all usage of the rdd to complete before calling unpersist, correct? > > On Wed, Sep 16, 2015 at 4:08 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> unpredictable. I think it will be safe (as in nothing should fail), but >> the performance will be unpredictable (some partition may use cache, some >> may not be able to use the cache). >> >> On Wed, Sep 16, 2015 at 1:06 PM, Paul Weiss <paulweiss@gmail.com> >> wrote: >> >>> Hi, >>> >>> What is the behavior when calling rdd.unpersist() from a different >>> thread while another thread is using that rdd. Below is a simple case for >>> this: >>> >>> 1) create rdd and load data >>> 2) call rdd.cache() to bring data into memory >>> 3) create another thread and pass rdd for a long computation >>> 4) call rdd.unpersist while 3. is still running >>> >>> Questions: >>> >>> * Will the computation in 3) finish properly even if unpersist was >>> called on the rdd while running? >>> * What happens if a part of the computation fails and the rdd needs to >>> reconstruct based on DAG lineage, will this still work even though >>> unpersist has been called? >>> >>> thanks, >>> -paul >>> >> >> >
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hi Stahlman, finalRDDStorageLevel is the storage level for the final user/item factors. It is not common to set it to StorageLevel.NONE, unless you want to save the factors directly to disk. So if it is NONE, we cannot unpersist the intermediate RDDs (in/out blocks) because the final user/item factors returned are not materialized. Otherwise, we have to recompute from the very beginning (or last checkpoint) when you materialize the final user/item factors. If you need want to have multiple runs, you can try to set finalRDDStorageLevel to MEMORY_AND_DISK, or clean previous runs so the cached RDDs get garbage collected. Best, Xiangrui On Wed, Jul 22, 2015 at 11:35 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: To be Unpersisted the RDD must be persisted first. If it's set to None, then it's not persisted, and as such does not need to be freed. Does that make sense ? Thank you, Ilya Ganelin -Original Message- From: Stahlman, Jonathan [jonathan.stahl...@capitalone.com] Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time To: user@spark.apache.org Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.org user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hi Burak, Looking at the source code, the intermediate RDDs used in ALS.train() are persisted during the computation using intermediateRDDStorageLevel (default value is StorageLevel.MEMORY_AND_DISK) - see herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L546, herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L548, and herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L556. At the end of the ALS calculation, these RDDs are no longer needed nor returned, so I would assume the logical choice would be to unpersist() these RDDs. The strategy in the code seems to be set by finalRDDStorageLevel, which for some reason only calls unpersist() on the intermediate RDDs if finalRDDStorageLevel != StorageLevel.NONE, which seems counter-intuitive to me. Jonathan From: Burak Yavuz brk...@gmail.commailto:brk...@gmail.com Date: Wednesday, July 22, 2015 at 10:47 AM To: Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hi Jonathan, I believe calling persist with StorageLevel.NONE doesn't do anything. That's why the unpersist has an if statement before it. Could you give more information about your setup please? Number of cores, memory, number of partitions of ratings_train? Thanks, Burak On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com wrote: Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hi Jonathan, I believe calling persist with StorageLevel.NONE doesn't do anything. That's why the unpersist has an if statement before it. Could you give more information about your setup please? Number of cores, memory, number of partitions of ratings_train? Thanks, Burak On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan jonathan.stahl...@capitalone.com wrote: Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.org user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
To be Unpersisted the RDD must be persisted first. If it's set to None, then it's not persisted, and as such does not need to be freed. Does that make sense ? Thank you, Ilya Ganelin -Original Message- From: Stahlman, Jonathan [jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com] Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time To: user@spark.apache.org Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Making Unpersist Lazy
rdd's which are no longer required will be removed from memory by spark itself (which you can consider as lazy?). Thanks Best Regards On Wed, Jul 1, 2015 at 7:48 PM, Jem Tucker jem.tuc...@gmail.com wrote: Hi, The current behavior of rdd.unpersist() appears to not be lazily executed and therefore must be placed after an action. Is there any way to emulate lazy execution of this function so it is added to the task queue? Thanks, Jem
Re: Making Unpersist Lazy
Hi, After running some tests it appears the unpersist is called as soon as it is reached, so any tasks using this rdd later on will have to re calculate it. This is fine for simple programs but when an rdd is created within a function and its reference is then lost but children of it continue to be used the persist/unpersist does not work effectively Thanks Jem On Thu, 2 Jul 2015 at 08:18, Akhil Das ak...@sigmoidanalytics.com wrote: rdd's which are no longer required will be removed from memory by spark itself (which you can consider as lazy?). Thanks Best Regards On Wed, Jul 1, 2015 at 7:48 PM, Jem Tucker jem.tuc...@gmail.com wrote: Hi, The current behavior of rdd.unpersist() appears to not be lazily executed and therefore must be placed after an action. Is there any way to emulate lazy execution of this function so it is added to the task queue? Thanks, Jem
RE: Making Unpersist Lazy
You may pass an optional parameter (blocking = false) to make it lazy. Thank you, Ilya Ganelin -Original Message- From: Jem Tucker [jem.tuc...@gmail.commailto:jem.tuc...@gmail.com] Sent: Thursday, July 02, 2015 04:06 AM Eastern Standard Time To: Akhil Das Cc: user Subject: Re: Making Unpersist Lazy Hi, After running some tests it appears the unpersist is called as soon as it is reached, so any tasks using this rdd later on will have to re calculate it. This is fine for simple programs but when an rdd is created within a function and its reference is then lost but children of it continue to be used the persist/unpersist does not work effectively Thanks Jem On Thu, 2 Jul 2015 at 08:18, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: rdd's which are no longer required will be removed from memory by spark itself (which you can consider as lazy?). Thanks Best Regards On Wed, Jul 1, 2015 at 7:48 PM, Jem Tucker jem.tuc...@gmail.commailto:jem.tuc...@gmail.com wrote: Hi, The current behavior of rdd.unpersist() appears to not be lazily executed and therefore must be placed after an action. Is there any way to emulate lazy execution of this function so it is added to the task queue? Thanks, Jem The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Making Unpersist Lazy
Hi, The current behavior of rdd.unpersist() appears to not be lazily executed and therefore must be placed after an action. Is there any way to emulate lazy execution of this function so it is added to the task queue? Thanks, Jem
Re: when cached RDD will unpersist its data
In a case that memory cannot hold all the cached RDD, then BlockManager will evict some older block for storage of new RDD block. Hope that will helpful. 2015-06-24 13:22 GMT+08:00 bit1...@163.com bit1...@163.com: I am kind of consused about when cached RDD will unpersist its data. I know we can explicitly unpersist it with RDD.unpersist ,but can it be unpersist automatically by the spark framework? Thanks. -- bit1...@163.com -- 王海华
when cached RDD will unpersist its data
I am kind of consused about when cached RDD will unpersist its data. I know we can explicitly unpersist it with RDD.unpersist ,but can it be unpersist automatically by the spark framework? Thanks. bit1...@163.com
Re: Futures timed out during unpersist
What is the data size? Have you tried increasing the driver memory?? Thanks Best Regards On Sat, Jan 17, 2015 at 1:01 PM, Kevin (Sangwoo) Kim kevin...@apache.org wrote: Hi experts, I got an error during unpersist RDD. Any ideas? java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:103) at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:951) at org.apache.spark.rdd.RDD.unpersist(RDD.scala:168)
Re: Futures timed out during unpersist
data size is about 300~400GB, I'm using 800GB cluster and set driver memory to 50GB. On Sat Jan 17 2015 at 6:01:46 PM Akhil Das ak...@sigmoidanalytics.com wrote: What is the data size? Have you tried increasing the driver memory?? Thanks Best Regards On Sat, Jan 17, 2015 at 1:01 PM, Kevin (Sangwoo) Kim kevin...@apache.org wrote: Hi experts, I got an error during unpersist RDD. Any ideas? java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:103) at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:951) at org.apache.spark.rdd.RDD.unpersist(RDD.scala:168)
Futures timed out during unpersist
Hi experts, I got an error during unpersist RDD. Any ideas? java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:103) at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:951) at org.apache.spark.rdd.RDD.unpersist(RDD.scala:168)
Unpersist
I want to create a temporary variables in a spark code. Can I do this? for (i - num) { val temp = .. { do something } temp.unpersist() } Thank You
Re: Unpersist
like this? var temp = ... for (i - num) { temp = .. { do something } temp.unpersist() } Thanks Best Regards On Thu, Sep 11, 2014 at 3:26 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: I want to create a temporary variables in a spark code. Can I do this? for (i - num) { val temp = .. { do something } temp.unpersist() } Thank You
Re: Unpersist
After every loop I want the temp variable to cease to exist On Thu, Sep 11, 2014 at 4:33 PM, Akhil Das ak...@sigmoidanalytics.com wrote: like this? var temp = ... for (i - num) { temp = .. { do something } temp.unpersist() } Thanks Best Regards On Thu, Sep 11, 2014 at 3:26 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: I want to create a temporary variables in a spark code. Can I do this? for (i - num) { val temp = .. { do something } temp.unpersist() } Thank You
Regarding function unpersist on rdd
Hello, Can someone enlighten me regarding whether call unpersist on a rdd is expensive? what is the best solution to uncache the cached rdd? Thanks Edwin
RE: Question about RDD cache, unpersist, materialization
Maybe It would be nice that unpersist() ‘triggers’ the computations of other rdds that depends on it but not yet computed. The pseudo code can be as follows: unpersist() { if (this rdd has not been persisted) return; for (all rdds that depends on this rdd but not yet computed) compute_that_rdd; do_actual_unpersist(); } From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] Sent: Friday, June 13, 2014 5:38 AM To: user@spark.apache.org Subject: Re: Question about RDD cache, unpersist, materialization I've run into this issue. The goal of caching / persist seems to be to avoid recomputing an RDD when its data will be needed multiple times. However, once the following RDDs are computed the cache is no longer needed. The currently design provides no obvious way to detect when the cache is no longer needed so it can be discarded. In the case of cache in memory, it may be handled by partitions being dropped (in LRU order) when memory fills up. I need to do some more experimentation to see if this really works well, or if allowing memory to fill up causes performance issues or possibly OOM errors if data isn't correctly freed. In the case of persisting to disk, I'm not sure if there's a way to limit the disk space used for caching. Does anyone know if there is such a configuration option? This is a pressing issue for me - I have had jobs fail because nodes ran out of disk space. On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: mailto:daniel.siegm...@velos.io daniel.siegm...@velos.io W: http://www.velos.io www.velos.io
Re: Question about RDD cache, unpersist, materialization
FYI: Here is a related discussion http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html about this. On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Maybe It would be nice that unpersist() ‘triggers’ the computations of other rdds that depends on it but not yet computed. The pseudo code can be as follows: unpersist() { if (this rdd has not been persisted) return; for (all rdds that depends on this rdd but not yet computed) compute_that_rdd; do_actual_unpersist(); } *From:* Daniel Siegmann [mailto:daniel.siegm...@velos.io] *Sent:* Friday, June 13, 2014 5:38 AM *To:* user@spark.apache.org *Subject:* Re: Question about RDD cache, unpersist, materialization I've run into this issue. The goal of caching / persist seems to be to avoid recomputing an RDD when its data will be needed multiple times. However, once the following RDDs are computed the cache is no longer needed. The currently design provides no obvious way to detect when the cache is no longer needed so it can be discarded. In the case of cache in memory, it may be handled by partitions being dropped (in LRU order) when memory fills up. I need to do some more experimentation to see if this really works well, or if allowing memory to fill up causes performance issues or possibly OOM errors if data isn't correctly freed. In the case of persisting to disk, I'm not sure if there's a way to limit the disk space used for caching. Does anyone know if there is such a configuration option? This is a pressing issue for me - I have had jobs fail because nodes ran out of disk space. On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
RE: Question about RDD cache, unpersist, materialization
Currently I use rdd.count() for forceful computation, as Nick Pentreath suggested. I think that it will be nice to have a method that forcefully computes a rdd, so that the unnecessary rdds are safely unpersist()ed. Let’s think a case that a rdd_a is a parent of both: (1) a short-term rdd_s that depends only on the rdd (transformation of rdd_a) (2) and a long-term rdd_t that depends on the rdds that may be computed later. (may be for some aggregation) Usually rdd_s is computed early, and rdd_a is computed for it. But it has to remain in memory until rdd_t is eventually computed. (Or recomputed when rdd_t is computed) It would be nice if rdd_t could be computed when rdd_s is computed, so that rdd_a can be unpersist()ed, since it will not be used anymore. (Currently I use rdd_t.count() for that) sc.prune() which was suggested in the related discussion you provided is not helpful for this case, since rdd_a is ‘referenced’ by ‘lazy’ rdd_t. (Get up rdd_t, and do it with rdd_s while rdd_a is here for rdd_s.) From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com] Sent: Friday, June 13, 2014 9:31 AM To: user Subject: Re: Question about RDD cache, unpersist, materialization FYI: Here is a related discussion http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html about this. On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Maybe It would be nice that unpersist() ¡®triggers¡¯ the computations of other rdds that depends on it but not yet computed. The pseudo code can be as follows: unpersist() { if (this rdd has not been persisted) return; for (all rdds that depends on this rdd but not yet computed) compute_that_rdd; do_actual_unpersist(); } From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] Sent: Friday, June 13, 2014 5:38 AM To: user@spark.apache.org Subject: Re: Question about RDD cache, unpersist, materialization I've run into this issue. The goal of caching / persist seems to be to avoid recomputing an RDD when its data will be needed multiple times. However, once the following RDDs are computed the cache is no longer needed. The currently design provides no obvious way to detect when the cache is no longer needed so it can be discarded. In the case of cache in memory, it may be handled by partitions being dropped (in LRU order) when memory fills up. I need to do some more experimentation to see if this really works well, or if allowing memory to fill up causes performance issues or possibly OOM errors if data isn't correctly freed. In the case of persisting to disk, I'm not sure if there's a way to limit the disk space used for caching. Does anyone know if there is such a configuration option? This is a pressing issue for me - I have had jobs fail because nodes ran out of disk space. On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop
RE: Question about RDD cache, unpersist, materialization
(I¡¯ve clarified the statement (1) of my previous mail. See below.) From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Friday, June 13, 2014 10:05 AM To: user@spark.apache.org Subject: RE: Question about RDD cache, unpersist, materialization Currently I use rdd.count() for forceful computation, as Nick Pentreath suggested. I think that it will be nice to have a method that forcefully computes a rdd, so that the unnecessary rdds are safely unpersist()ed. Let’s think a case that a rdd_a is a parent of both: (1) a short-term rdd_s that depends only on rdd_a (maybe rdd_s is a transformation of rdd_a) (2) and a long-term rdd_t that depends on the rdds that may be computed later. (may be for some aggregation) Usually rdd_s is computed early, and rdd_a is computed for it. But it has to remain in memory until rdd_t is eventually computed. (Or recomputed when rdd_t is computed) It would be nice if rdd_t could be computed when rdd_s is computed, so that rdd_a can be unpersist()ed, since it will not be used anymore. (Currently I use rdd_t.count() for that) sc.prune() which was suggested in the related discussion you provided is not helpful for this case, since rdd_a is ‘referenced’ by ‘lazy’ rdd_t. (Get up rdd_t, and do it with rdd_s while rdd_a is here for rdd_s.) From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com] Sent: Friday, June 13, 2014 9:31 AM To: user Subject: Re: Question about RDD cache, unpersist, materialization FYI: Here is a related discussion http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html about this. On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Maybe It would be nice that unpersist() ¡®triggers¡¯ the computations of other rdds that depends on it but not yet computed. The pseudo code can be as follows: unpersist() { if (this rdd has not been persisted) return; for (all rdds that depends on this rdd but not yet computed) compute_that_rdd; do_actual_unpersist(); } From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] Sent: Friday, June 13, 2014 5:38 AM To: user@spark.apache.org Subject: Re: Question about RDD cache, unpersist, materialization I've run into this issue. The goal of caching / persist seems to be to avoid recomputing an RDD when its data will be needed multiple times. However, once the following RDDs are computed the cache is no longer needed. The currently design provides no obvious way to detect when the cache is no longer needed so it can be discarded. In the case of cache in memory, it may be handled by partitions being dropped (in LRU order) when memory fills up. I need to do some more experimentation to see if this really works well, or if allowing memory to fill up causes performance issues or possibly OOM errors if data isn't correctly freed. In the case of persisting to disk, I'm not sure if there's a way to limit the disk space used for caching. Does anyone know if there is such a configuration option? This is a pressing issue for me - I have had jobs fail because nodes ran out of disk space. On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox https://www.dropbox.com/mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10
RE: Question about RDD cache, unpersist, materialization
If you want to force materialization use .count() Also if you can simply don't unpersist anything, unless you really need to free the memory — Sent from Mailbox On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks.
Question about RDD cache, unpersist, materialization
Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks.
RE: Question about RDD cache, unpersist, materialization
BTW, it is possible that rdd.first() does not compute the whole partitions. So, first() cannot be uses for the situation below. -Original Message- From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] Sent: Wednesday, June 11, 2014 11:40 AM To: user@spark.apache.org Subject: Question about RDD cache, unpersist, materialization Hi, What I (seems to) know about RDD persisting API is as follows: - cache() and persist() is not an action. It only does a marking. - unpersist() is also not an action. It only removes a marking. But if the rdd is already in memory, it is unloaded. And there seems no API to forcefully materialize the RDD without requiring a data by an action method, for example first(). So, I am faced with the following scenario. { JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create empty for merging for (int i = 0; i 10; i++) { JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); rdd.cache(); // Since it will be used twice, cache. rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // Transform and save, rdd materializes rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another transform to T and merge by union rdd.unpersist(); // Now it seems not needed. (But needed actually) } // Here, rddUnion actually materializes, and needs all 10 rdds that already unpersisted. // So, rebuilding all 10 rdds will occur. rddUnion.saveAsTextFile(mergedFileName); } If rddUnion can be materialized before the rdd.unpersist() line and cache()d, the rdds in the loop will not be needed on rddUnion.saveAsTextFile(). Now what is the best strategy? - Do not unpersist all 10 rdds in the loop. - Materialize rddUnion in the loop by calling 'light' action API, like first(). - Give up and just rebuild/reload all 10 rdds when saving rddUnion. Is there some misunderstanding? Thanks.
Persist and unpersist
I keep bumping into a problem with persisting RDDs. Consider this (silly) example: def everySecondFromBehind(input: RDD[Int]): RDD[Int] = { val count = input.count if (count % 2 == 0) { return input.filter(_ % 2 == 1) } else { return input.filter(_ % 2 == 0) } } The situation is that we want to do two things with an RDD (a count and a filter in the example). The input RDD may represent a very expensive calculation. So it would make sense to add an input.cache() line at the beginning. But where do we put input.unpersist()? input.cache()val count = input.countval result = input.filter(...) input.unpersist()return result input.filter() is lazy, so this does not work as expected. We only want to release input from the cache once nothing depends on it anymore. Maybe result was garbage collected. Maybe result itself has been cached. But there is no way to detect such conditions. Our current approach is to just leave the RDD cached, and it will get dumped at some point anyway. Is there a better solution? Thanks for any tips.
Re: Persist and unpersist
Daniel, Is SPARK-1103 https://issues.apache.org/jira/browse/SPARK-1103 related to your example? Automatic unpersist()-ing of unreferenced RDDs would be nice. Nick On Tue, May 27, 2014 at 12:28 PM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: I keep bumping into a problem with persisting RDDs. Consider this (silly) example: def everySecondFromBehind(input: RDD[Int]): RDD[Int] = { val count = input.count if (count % 2 == 0) { return input.filter(_ % 2 == 1) } else { return input.filter(_ % 2 == 0) } } The situation is that we want to do two things with an RDD (a count and a filter in the example). The input RDD may represent a very expensive calculation. So it would make sense to add an input.cache() line at the beginning. But where do we put input.unpersist()? input.cache()val count = input.countval result = input.filter(...) input.unpersist()return result input.filter() is lazy, so this does not work as expected. We only want to release input from the cache once nothing depends on it anymore. Maybe result was garbage collected. Maybe result itself has been cached. But there is no way to detect such conditions. Our current approach is to just leave the RDD cached, and it will get dumped at some point anyway. Is there a better solution? Thanks for any tips.
Re: Persist and unpersist
I think what's desired here is for input to be unpersisted automatically as soon as result is materialized. I don't think there's currently a way to do this, but the usual workaround is to force result to be materialized immediately and then unpersist input: input.cache()val count = input.countval result = input.filter(...) result.cache().foreach(x = {}) // materialize resultinput.unpersist() // safe because `result` is materialized // and is the only RDD that depends on `input`return result Ankur http://www.ankurdave.com/