Spark structured streaming with periodical persist and unpersist

2021-02-11 Thread act_coder
I am currently building a spark structured streaming application where I am
doing a batch-stream join. And the source for the batch data gets updated
periodically.

So, I am planning to do a persist/unpersist of that batch data periodically.

Below is a sample code which I am using to persist and unpersist the batch
data.

Flow: -> Read the batch data -> persist the batch data -> For every one
hour, unpersist the data and read the batch data and persist it again.

But, I am not seeing the batch data getting refreshed for every hour.

Code:

var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()

if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime)
{
  refreshedTime = Instant.now()
  batchDF.unpersist(false)
  batchDF =  handler.readBatchDF(sparkSession)
.persist(StorageLevel.MEMORY_AND_DISK)
}
Is there any better way to achieve this scenario in spark structured
streaming jobs ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Broadcast variables: destroy/unpersist unexpected behaviour

2018-03-13 Thread Sunil
I experienced the below two cases when unpersisting or destroying broadcast
variables in pyspark. But the same works good in spark scala shell. Any clue
why this happens ? Is it a bug in pyspark?

***Case 1:***
>>> b1 = sc.broadcast([1,2,3])
>>> b1.value
[1, 2, 3]
>>> b1.destroy()
>>> b1.value
[1, 2, 3]
I can still access the value in driver.


***Case 2:***
>>> b = sc.broadcast([1,2,3])
>>> b.destroy()
>>> b.value
Traceback (most recent call last):
  File "", line 1, in 
  File
"/home/sdh/Downloads/spark-2.2.1-bin-hadoop2.7/python/pyspark/broadcast.py",
line 109, in value
self._value = self.load(self._path)
  File
"/home/sdh/Downloads/spark-2.2.1-bin-hadoop2.7/python/pyspark/broadcast.py",
line 95, in load
with open(path, 'rb', 1 << 20) as f:
IOError: [Errno 2] No such file or directory:
u'/tmp/spark-eef352c0-6470-4b89-999f-923493a27bc4/pyspark-17d3a9a3-b5c1-4331-b408-8447f078789e/tmpzq4kv0'

Rather i should get a message something similar to "Attempted to use
broadcast variable after it was destroyed"






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unpersist all from memory in spark 2.2

2017-09-25 Thread Cesar
Is there a way to unpersist all data frames, data sets, and/or RDD in Spark
2.2 in a single call?


Thanks
-- 
Cesar Flores


Re: Unpersist RDD in Graphx

2016-02-01 Thread Takeshi Yamamuro
Hi,

Please call "Graph#unpersist" that releases two RDDs, vertex and edge ones.
"Graph#unpersist"  just invokes "Graph#unpersistVertices" and
"Graph#edges#unpersist";
"Graph#unpersistVertices" releases memory for vertices and
"Graph#edges#unpersist"
does memory for edges.
If blocking = true,  unpersist() waits until memory released from
BlockManager.



On Mon, Feb 1, 2016 at 8:35 AM, Zhang, Jingyu <jingyu.zh...@news.com.au>
wrote:

> Hi, What is he best way to unpersist the RDD in graphx to release memory?
> RDD.unpersist
> or
> RDD.unpersistVertices and RDD..edges.unpersist
>
> I study the source code of Pregel.scala, Both of above were used between
> line 148 and line 150. Can anyone please tell me what the different? In
> addition, what is the difference to set blocking = false and blocking =
> true?
>
> oldMessages.unpersist(blocking = false)
> prevG.unpersistVertices(blocking = false)
> prevG.edges.unpersist(blocking = false)
>
> Thanks,
>
> Jingyu
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.




-- 
---
Takeshi Yamamuro


Unpersist RDD in Graphx

2016-01-31 Thread Zhang, Jingyu
Hi, What is he best way to unpersist the RDD in graphx to release memory?
RDD.unpersist
or
RDD.unpersistVertices and RDD..edges.unpersist

I study the source code of Pregel.scala, Both of above were used between
line 148 and line 150. Can anyone please tell me what the different? In
addition, what is the difference to set blocking = false and blocking =
true?

oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)

Thanks,

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: automatically unpersist RDDs which are not used for 24 hours?

2016-01-13 Thread Andrew Or
Hi Alex,

Yes, you can set `spark.cleaner.ttl`:
http://spark.apache.org/docs/1.6.0/configuration.html, but I would not
recommend it!

We are actually removing this property in Spark 2.0 because it has caused
problems for many users in the past. In particular, if you accidentally use
a variable that has been automatically cleaned, then you will run into
problems like shuffle fetch failures or broadcast variable not found etc,
which may fail your job.

Alternatively, Spark already automatically cleans up all variables that
have been garbage collected, including RDDs, shuffle dependencies,
broadcast variables and accumulators. This context-based cleaning has been
enabled by default for many versions by now so it should be reliable. The
only caveat is that it may not work super well in a shell environment,
where some variables may never exit the scope.

Please let me know if you have more questions,
-Andrew


2016-01-13 11:36 GMT-08:00 Alexander Pivovarov <apivova...@gmail.com>:

> Is it possible to automatically unpersist RDDs which are not used for 24
> hours?
>


automatically unpersist RDDs which are not used for 24 hours?

2016-01-13 Thread Alexander Pivovarov
Is it possible to automatically unpersist RDDs which are not used for 24
hours?


Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-12-08 Thread Ewan Higgs

Sean,

Thanks.
It's a developer API and doesn't appear to be exposed.

Ewan

On 07/12/15 15:06, Sean Owen wrote:

I'm not sure if this is available in Python but from 1.3 on you should
be able to call ALS.setFinalRDDStorageLevel with level "none" to ask
it to unpersist when it is done.

On Mon, Dec 7, 2015 at 1:42 PM, Ewan Higgs <ewan.hi...@ugent.be> wrote:

Jonathan,
Did you ever get to the bottom of this? I have some users working with Spark
in a classroom setting and our example notebooks run into problems where
there is so much spilled to disk that they run out of quota. A 1.5G input
set becomes >30G of spilled data on disk. I looked into how I could
unpersist the data so I could clean up the files, but I was unsuccessful.

We're using Spark 1.5.0

Yours,
Ewan

On 16/07/15 23:18, Stahlman, Jonathan wrote:

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been
studying its output with various model configurations.  Ideally I would like
to be able to run one job that trains the recommendation model with many
different configurations to try to optimize for performance.  A sample code
in python is copied below.

The issue I have is that each new model which is trained caches a set of
RDDs and eventually the executors run out of memory.  Is there any way in
Pyspark to unpersist() these RDDs after each iteration?  The names of the
RDDs which I gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
   functions = [rating] #defined elsewhere
   ranks = [10,20]
   iterations = [10,20]
   lambdas = [0.01,0.1]
   alphas  = [1.0,50.0]

   results = []
   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
itertools.product( functions, ranks, iterations, lambdas, alphas ):
 #train model
 ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
ratingFunction(l) ) )
 model   = ALS.trainImplicit( ratings_train, rank, numIterations,
lambda_=float(m_lambda), alpha=float(m_alpha) )

 #test performance on CV data
 ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
ratingFunction(l) ) )
 auc = areaUnderCurve( ratings_cv, model.predictAll )

 #save results
 result = ",".join(str(l) for l in
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
 results.append(result)



The information contained in this e-mail is confidential and/or proprietary
to Capital One and/or its affiliates and may only be used solely in
performance of work or services for Capital One. The information transmitted
herewith is intended only for use by the individual or entity to which it is
addressed. If the reader of this message is not the intended recipient, you
are hereby notified that any review, retransmission, dissemination,
distribution, copying or other use of, or taking of any action in reliance
upon this information is strictly prohibited. If you have received this
communication in error, please contact the sender and delete the material
from your computer.





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-12-07 Thread Ewan Higgs

Jonathan,
Did you ever get to the bottom of this? I have some users working with 
Spark in a classroom setting and our example notebooks run into problems 
where there is so much spilled to disk that they run out of quota. A 
1.5G input set becomes >30G of spilled data on disk. I looked into how I 
could unpersist the data so I could clean up the files, but I was 
unsuccessful.


We're using Spark 1.5.0

Yours,
Ewan

On 16/07/15 23:18, Stahlman, Jonathan wrote:

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have 
been studying its output with various model configurations.  Ideally I 
would like to be able to run one job that trains the recommendation 
model with many different configurations to try to optimize for 
performance.  A sample code in python is copied below.


The issue I have is that each new model which is trained caches a set 
of RDDs and eventually the executors run out of memory.  Is there any 
way in Pyspark to unpersist() these RDDs after each iteration?  The 
names of the RDDs which I gather from the UI is:


itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):

#train model
ratings_train = data_train.map(lambda l: Rating( l.user, 
l.product, ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )


#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )

auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ",".join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])

results.append(result)



The information contained in this e-mail is confidential and/or 
proprietary to Capital One and/or its affiliates and may only be used 
solely in performance of work or services for Capital One. The 
information transmitted herewith is intended only for use by the 
individual or entity to which it is addressed. If the reader of this 
message is not the intended recipient, you are hereby notified that 
any review, retransmission, dissemination, distribution, copying or 
other use of, or taking of any action in reliance upon this 
information is strictly prohibited. If you have received this 
communication in error, please contact the sender and delete the 
material from your computer.






Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-12-07 Thread Sean Owen
I'm not sure if this is available in Python but from 1.3 on you should
be able to call ALS.setFinalRDDStorageLevel with level "none" to ask
it to unpersist when it is done.

On Mon, Dec 7, 2015 at 1:42 PM, Ewan Higgs <ewan.hi...@ugent.be> wrote:
> Jonathan,
> Did you ever get to the bottom of this? I have some users working with Spark
> in a classroom setting and our example notebooks run into problems where
> there is so much spilled to disk that they run out of quota. A 1.5G input
> set becomes >30G of spilled data on disk. I looked into how I could
> unpersist the data so I could clean up the files, but I was unsuccessful.
>
> We're using Spark 1.5.0
>
> Yours,
> Ewan
>
> On 16/07/15 23:18, Stahlman, Jonathan wrote:
>
> Hello all,
>
> I am running the Spark recommendation algorithm in MLlib and I have been
> studying its output with various model configurations.  Ideally I would like
> to be able to run one job that trains the recommendation model with many
> different configurations to try to optimize for performance.  A sample code
> in python is copied below.
>
> The issue I have is that each new model which is trained caches a set of
> RDDs and eventually the executors run out of memory.  Is there any way in
> Pyspark to unpersist() these RDDs after each iteration?  The names of the
> RDDs which I gather from the UI is:
>
> itemInBlocks
> itemOutBlocks
> Products
> ratingBlocks
> userInBlocks
> userOutBlocks
> users
>
> I am using Spark 1.3.  Thank you for any help!
>
> Regards,
> Jonathan
>
>
>
>
>   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
>   functions = [rating] #defined elsewhere
>   ranks = [10,20]
>   iterations = [10,20]
>   lambdas = [0.01,0.1]
>   alphas  = [1.0,50.0]
>
>   results = []
>   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
> itertools.product( functions, ranks, iterations, lambdas, alphas ):
> #train model
> ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
> ratingFunction(l) ) )
> model   = ALS.trainImplicit( ratings_train, rank, numIterations,
> lambda_=float(m_lambda), alpha=float(m_alpha) )
>
> #test performance on CV data
> ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
> ratingFunction(l) ) )
> auc = areaUnderCurve( ratings_cv, model.predictAll )
>
> #save results
> result = ",".join(str(l) for l in
> [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
> results.append(result)
>
> 
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to unpersist a DStream in Spark Streaming

2015-11-06 Thread Adrian Tanase
Do we have any guarantees on the maximum duration?

I've seen RDDs kept around for 7-10 minutes on batches of 20 secs and 
checkpoint of 100 secs. No windows, just updateStateByKey.

 t's not a memory issue but on checkpoint recovery it goes back to Kafka for 10 
minutes of data, any idea why?

-adrian

Sent from my iPhone

On 06 Nov 2015, at 09:45, Tathagata Das 
<t...@databricks.com<mailto:t...@databricks.com>> wrote:

Spark streaming automatically takes care of unpersisting any RDDs generated by 
DStream. You can set the StreamingContext.remember() to set the minimum 
persistence duration. Any persisted RDD older than that will be automatically 
unpersisted

On Thu, Nov 5, 2015 at 9:12 AM, swetha kasireddy 
<swethakasire...@gmail.com<mailto:swethakasire...@gmail.com>> wrote:
Its just in the same thread for a particular RDD, I need to uncache it every 2 
minutes to clear out the data that is present in a Map inside that.

On Wed, Nov 4, 2015 at 11:54 PM, Saisai Shao 
<sai.sai.s...@gmail.com<mailto:sai.sai.s...@gmail.com>> wrote:
Hi Swetha,

Would you mind elaborating your usage scenario of DStream unpersisting?

>From my understanding:

1. Spark Streaming will automatically unpersist outdated data (you already 
mentioned about the configurations).
2. If streaming job is started, I think you may lose the control of the job, 
when do you call this unpersist, how to call this unpersist (from another 
thread)?

Thanks
Saisai


On Thu, Nov 5, 2015 at 3:13 PM, swetha kasireddy 
<swethakasire...@gmail.com<mailto:swethakasire...@gmail.com>> wrote:
Other than setting the following.


sparkConf.set("spark.streaming.unpersist", "true")
sparkConf.set("spark.cleaner.ttl", "7200s")

On Wed, Nov 4, 2015 at 5:03 PM, swetha 
<swethakasire...@gmail.com<mailto:swethakasire...@gmail.com>> wrote:
Hi,

How to unpersist a DStream in Spark Streaming? I know that we can persist
using dStream.persist() or dStream.cache. But, I don't see any method to
unPersist.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com<http://nabble.com>.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>







Re: How to unpersist a DStream in Spark Streaming

2015-11-05 Thread swetha kasireddy
Its just in the same thread for a particular RDD, I need to uncache it
every 2 minutes to clear out the data that is present in a Map inside that.

On Wed, Nov 4, 2015 at 11:54 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> Hi Swetha,
>
> Would you mind elaborating your usage scenario of DStream unpersisting?
>
> From my understanding:
>
> 1. Spark Streaming will automatically unpersist outdated data (you already
> mentioned about the configurations).
> 2. If streaming job is started, I think you may lose the control of the
> job, when do you call this unpersist, how to call this unpersist (from
> another thread)?
>
> Thanks
> Saisai
>
>
> On Thu, Nov 5, 2015 at 3:13 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Other than setting the following.
>>
>> sparkConf.set("spark.streaming.unpersist", "true")
>> sparkConf.set("spark.cleaner.ttl", "7200s")
>>
>>
>> On Wed, Nov 4, 2015 at 5:03 PM, swetha <swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> How to unpersist a DStream in Spark Streaming? I know that we can persist
>>> using dStream.persist() or dStream.cache. But, I don't see any method to
>>> unPersist.
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: How to unpersist a DStream in Spark Streaming

2015-11-05 Thread Tathagata Das
Spark streaming automatically takes care of unpersisting any RDDs generated
by DStream. You can set the StreamingContext.remember() to set the minimum
persistence duration. Any persisted RDD older than that will be
automatically unpersisted

On Thu, Nov 5, 2015 at 9:12 AM, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> Its just in the same thread for a particular RDD, I need to uncache it
> every 2 minutes to clear out the data that is present in a Map inside that.
>
> On Wed, Nov 4, 2015 at 11:54 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> Hi Swetha,
>>
>> Would you mind elaborating your usage scenario of DStream unpersisting?
>>
>> From my understanding:
>>
>> 1. Spark Streaming will automatically unpersist outdated data (you
>> already mentioned about the configurations).
>> 2. If streaming job is started, I think you may lose the control of the
>> job, when do you call this unpersist, how to call this unpersist (from
>> another thread)?
>>
>> Thanks
>> Saisai
>>
>>
>> On Thu, Nov 5, 2015 at 3:13 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Other than setting the following.
>>>
>>> sparkConf.set("spark.streaming.unpersist", "true")
>>> sparkConf.set("spark.cleaner.ttl", "7200s")
>>>
>>>
>>> On Wed, Nov 4, 2015 at 5:03 PM, swetha <swethakasire...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> How to unpersist a DStream in Spark Streaming? I know that we can
>>>> persist
>>>> using dStream.persist() or dStream.cache. But, I don't see any method to
>>>> unPersist.
>>>>
>>>> Thanks,
>>>> Swetha
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


How to unpersist a DStream in Spark Streaming

2015-11-04 Thread swetha
Hi,

How to unpersist a DStream in Spark Streaming? I know that we can persist
using dStream.persist() or dStream.cache. But, I don't see any method to
unPersist.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to unpersist a DStream in Spark Streaming

2015-11-04 Thread Saisai Shao
Hi Swetha,

Would you mind elaborating your usage scenario of DStream unpersisting?

>From my understanding:

1. Spark Streaming will automatically unpersist outdated data (you already
mentioned about the configurations).
2. If streaming job is started, I think you may lose the control of the
job, when do you call this unpersist, how to call this unpersist (from
another thread)?

Thanks
Saisai


On Thu, Nov 5, 2015 at 3:13 PM, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> Other than setting the following.
>
> sparkConf.set("spark.streaming.unpersist", "true")
> sparkConf.set("spark.cleaner.ttl", "7200s")
>
>
> On Wed, Nov 4, 2015 at 5:03 PM, swetha <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> How to unpersist a DStream in Spark Streaming? I know that we can persist
>> using dStream.persist() or dStream.cache. But, I don't see any method to
>> unPersist.
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to unpersist a DStream in Spark Streaming

2015-11-04 Thread Yashwanth Kumar
Hi,

DStream->Discretized Streams are made up of multiple RDDs
You can unpersist each RDD by accessing the individual RDD's using 

dstreamrdd.foreachRDD
{

rdd.unpersist().


}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281p25284.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to unpersist a DStream in Spark Streaming

2015-11-04 Thread swetha kasireddy
Other than setting the following.

sparkConf.set("spark.streaming.unpersist", "true")
sparkConf.set("spark.cleaner.ttl", "7200s")


On Wed, Nov 4, 2015 at 5:03 PM, swetha <swethakasire...@gmail.com> wrote:

> Hi,
>
> How to unpersist a DStream in Spark Streaming? I know that we can persist
> using dStream.persist() or dStream.cache. But, I don't see any method to
> unPersist.
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-unpersist-a-DStream-in-Spark-Streaming-tp25281.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


unpersist RDD from another thread

2015-09-16 Thread Paul Weiss
Hi,

What is the behavior when calling rdd.unpersist() from a different thread
while another thread is using that rdd.  Below is a simple case for this:

1) create rdd and load data
2) call rdd.cache() to bring data into memory
3) create another thread and pass rdd for a long computation
4) call rdd.unpersist while 3. is still running

Questions:

* Will the computation in 3) finish properly even if unpersist was called
on the rdd while running?
* What happens if a part of the computation fails and the rdd needs to
reconstruct based on DAG lineage, will this still work even though
unpersist has been called?

thanks,
-paul


Re: unpersist RDD from another thread

2015-09-16 Thread Paul Weiss
So in order to not incur any performance issues I should really wait for
all usage of the rdd to complete before calling unpersist, correct?

On Wed, Sep 16, 2015 at 4:08 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> unpredictable. I think it will be safe (as in nothing should fail), but
> the performance will be unpredictable (some partition may use cache, some
> may not be able to use the cache).
>
> On Wed, Sep 16, 2015 at 1:06 PM, Paul Weiss <paulweiss@gmail.com>
> wrote:
>
>> Hi,
>>
>> What is the behavior when calling rdd.unpersist() from a different thread
>> while another thread is using that rdd.  Below is a simple case for this:
>>
>> 1) create rdd and load data
>> 2) call rdd.cache() to bring data into memory
>> 3) create another thread and pass rdd for a long computation
>> 4) call rdd.unpersist while 3. is still running
>>
>> Questions:
>>
>> * Will the computation in 3) finish properly even if unpersist was called
>> on the rdd while running?
>> * What happens if a part of the computation fails and the rdd needs to
>> reconstruct based on DAG lineage, will this still work even though
>> unpersist has been called?
>>
>> thanks,
>> -paul
>>
>
>


Re: unpersist RDD from another thread

2015-09-16 Thread Tathagata Das
Yes.

On Wed, Sep 16, 2015 at 1:12 PM, Paul Weiss <paulweiss@gmail.com> wrote:

> So in order to not incur any performance issues I should really wait for
> all usage of the rdd to complete before calling unpersist, correct?
>
> On Wed, Sep 16, 2015 at 4:08 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> unpredictable. I think it will be safe (as in nothing should fail), but
>> the performance will be unpredictable (some partition may use cache, some
>> may not be able to use the cache).
>>
>> On Wed, Sep 16, 2015 at 1:06 PM, Paul Weiss <paulweiss@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> What is the behavior when calling rdd.unpersist() from a different
>>> thread while another thread is using that rdd.  Below is a simple case for
>>> this:
>>>
>>> 1) create rdd and load data
>>> 2) call rdd.cache() to bring data into memory
>>> 3) create another thread and pass rdd for a long computation
>>> 4) call rdd.unpersist while 3. is still running
>>>
>>> Questions:
>>>
>>> * Will the computation in 3) finish properly even if unpersist was
>>> called on the rdd while running?
>>> * What happens if a part of the computation fails and the rdd needs to
>>> reconstruct based on DAG lineage, will this still work even though
>>> unpersist has been called?
>>>
>>> thanks,
>>> -paul
>>>
>>
>>
>


Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-28 Thread Xiangrui Meng
Hi Stahlman,

finalRDDStorageLevel is the storage level for the final user/item
factors. It is not common to set it to StorageLevel.NONE, unless you
want to save the factors directly to disk. So if it is NONE, we cannot
unpersist the intermediate RDDs (in/out blocks) because the final
user/item factors returned are not materialized. Otherwise, we have to
recompute from the very beginning (or last checkpoint) when you
materialize the final user/item factors. If you need want to have
multiple runs, you can try to set finalRDDStorageLevel to
MEMORY_AND_DISK, or clean previous runs so the cached RDDs get garbage
collected.

Best,
Xiangrui

On Wed, Jul 22, 2015 at 11:35 AM, Ganelin, Ilya
ilya.gane...@capitalone.com wrote:
 To be Unpersisted the RDD must be persisted first. If it's set to None, then
 it's not persisted, and as such does not need to be freed. Does that make
 sense ?



 Thank you,
 Ilya Ganelin




 -Original Message-
 From: Stahlman, Jonathan [jonathan.stahl...@capitalone.com]
 Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
 To: user@spark.apache.org
 Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

 Hello again,

 In trying to understand the caching of intermediate RDDs by ALS, I looked
 into the source code and found what may be a bug.  Looking here:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

 you see that ALS.train() is being called with finalRDDStorageLevel =
 StorageLevel.NONE, which I would understand to mean that the intermediate
 RDDs will not be persisted.  Looking here:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

 unpersist() is only being called on the intermediate RDDs (all the *Blocks
 RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

 This doesn’t make sense to me – I would expect the RDDs to be removed from
 the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
 around.

 Jonathan


 From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com
 Date: Thursday, July 16, 2015 at 2:18 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

 Hello all,

 I am running the Spark recommendation algorithm in MLlib and I have been
 studying its output with various model configurations.  Ideally I would like
 to be able to run one job that trains the recommendation model with many
 different configurations to try to optimize for performance.  A sample code
 in python is copied below.

 The issue I have is that each new model which is trained caches a set of
 RDDs and eventually the executors run out of memory.  Is there any way in
 Pyspark to unpersist() these RDDs after each iteration?  The names of the
 RDDs which I gather from the UI is:

 itemInBlocks
 itemOutBlocks
 Products
 ratingBlocks
 userInBlocks
 userOutBlocks
 users

 I am using Spark 1.3.  Thank you for any help!

 Regards,
 Jonathan




   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
   functions = [rating] #defined elsewhere
   ranks = [10,20]
   iterations = [10,20]
   lambdas = [0.01,0.1]
   alphas  = [1.0,50.0]

   results = []
   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
 itertools.product( functions, ranks, iterations, lambdas, alphas ):
 #train model
 ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
 ratingFunction(l) ) )
 model   = ALS.trainImplicit( ratings_train, rank, numIterations,
 lambda_=float(m_lambda), alpha=float(m_alpha) )

 #test performance on CV data
 ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
 ratingFunction(l) ) )
 auc = areaUnderCurve( ratings_cv, model.predictAll )

 #save results
 result = ,.join(str(l) for l in
 [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
 results.append(result)

 

 The information contained in this e-mail is confidential and/or proprietary
 to Capital One and/or its affiliates and may only be used solely in
 performance of work or services for Capital One. The information transmitted
 herewith is intended only for use by the individual or entity to which it is
 addressed. If the reader of this message is not the intended recipient, you
 are hereby notified that any review, retransmission, dissemination,
 distribution, copying or other use of, or taking of any action in reliance
 upon this information is strictly prohibited. If you have received this
 communication in error, please contact the sender and delete the material
 from your computer.


 

 The information contained in this e-mail is confidential and/or proprietary
 to Capital One and/or its affiliates and may only be used solely in
 performance of work or services for Capital One. The information

Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Stahlman, Jonathan
Hi Burak,

Looking at the source code, the intermediate RDDs used in ALS.train() are 
persisted during the computation using intermediateRDDStorageLevel (default 
value is StorageLevel.MEMORY_AND_DISK) - see 
herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L546,
 
herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L548,
 and 
herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L556.
  At the end of the ALS calculation, these RDDs are no longer needed nor 
returned, so I would assume the logical choice would be to unpersist() these 
RDDs.  The strategy in the code seems to be set by finalRDDStorageLevel, which 
for some reason only calls unpersist() on the intermediate RDDs if  
finalRDDStorageLevel != StorageLevel.NONE, which seems counter-intuitive to me.

Jonathan

From: Burak Yavuz brk...@gmail.commailto:brk...@gmail.com
Date: Wednesday, July 22, 2015 at 10:47 AM
To: Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything. That's 
why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores, 
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com 
wrote:
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: Stahlman, Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Date: Thursday, July 16, 2015 at 2:18 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed

Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Burak Yavuz
Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything.
That's why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores,
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan 
jonathan.stahl...@capitalone.com wrote:

 Hello again,

 In trying to understand the caching of intermediate RDDs by ALS, I looked
 into the source code and found what may be a bug.  Looking here:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

 you see that ALS.train() is being called with finalRDDStorageLevel =
 StorageLevel.NONE, which I would understand to mean that the intermediate
 RDDs will not be persisted.  Looking here:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

 unpersist() is only being called on the intermediate RDDs (all the *Blocks
 RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.


 This doesn’t make sense to me – I would expect the RDDs to be removed from
 the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
 around.

 Jonathan


 From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com
 Date: Thursday, July 16, 2015 at 2:18 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

 Hello all,

 I am running the Spark recommendation algorithm in MLlib and I have been
 studying its output with various model configurations.  Ideally I would
 like to be able to run one job that trains the recommendation model with
 many different configurations to try to optimize for performance.  A sample
 code in python is copied below.

 The issue I have is that each new model which is trained caches a set of
 RDDs and eventually the executors run out of memory.  Is there any way in
 Pyspark to unpersist() these RDDs after each iteration?  The names of the
 RDDs which I gather from the UI is:

 itemInBlocks
 itemOutBlocks
 Products
 ratingBlocks
 userInBlocks
 userOutBlocks
 users

 I am using Spark 1.3.  Thank you for any help!

 Regards,
 Jonathan




   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
   functions = [rating] #defined elsewhere
   ranks = [10,20]
   iterations = [10,20]
   lambdas = [0.01,0.1]
   alphas  = [1.0,50.0]

   results = []
   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
 itertools.product( functions, ranks, iterations, lambdas, alphas ):
 #train model
 ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
 ratingFunction(l) ) )
 model   = ALS.trainImplicit( ratings_train, rank, numIterations,
 lambda_=float(m_lambda), alpha=float(m_alpha) )

 #test performance on CV data
 ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
 ratingFunction(l) ) )
 auc = areaUnderCurve( ratings_cv, model.predictAll )

 #save results
 result = ,.join(str(l) for l in
 [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
 results.append(result)

 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. If you
 have received this communication in error, please contact the sender and
 delete the material from your computer.



RE: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Ganelin, Ilya
To be Unpersisted the RDD must be persisted first. If it's set to None, then 
it's not persisted, and as such does not need to be freed. Does that make sense 
?



Thank you,
Ilya Ganelin



-Original Message-
From: Stahlman, Jonathan 
[jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com]
Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: Stahlman, Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Date: Thursday, July 16, 2015 at 2:18 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Stahlman, Jonathan
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: Stahlman, Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Date: Thursday, July 16, 2015 at 2:18 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-16 Thread Stahlman, Jonathan
Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Making Unpersist Lazy

2015-07-02 Thread Akhil Das
rdd's which are no longer required will be removed from memory by spark
itself (which you can consider as lazy?).

Thanks
Best Regards

On Wed, Jul 1, 2015 at 7:48 PM, Jem Tucker jem.tuc...@gmail.com wrote:

 Hi,

 The current behavior of rdd.unpersist() appears to not be lazily executed
 and therefore must be placed after an action. Is there any way to emulate
 lazy execution of this function so it is added to the task queue?

 Thanks,

 Jem



Re: Making Unpersist Lazy

2015-07-02 Thread Jem Tucker
Hi,

After running some tests it appears the unpersist is called as soon as it
is reached, so any tasks using this rdd later on will have to re calculate
it. This is fine for simple programs but when an rdd is created within a
function and its reference is then lost but children of it continue to be
used the persist/unpersist does not work effectively

Thanks

Jem
On Thu, 2 Jul 2015 at 08:18, Akhil Das ak...@sigmoidanalytics.com wrote:

 rdd's which are no longer required will be removed from memory by spark
 itself (which you can consider as lazy?).

 Thanks
 Best Regards

 On Wed, Jul 1, 2015 at 7:48 PM, Jem Tucker jem.tuc...@gmail.com wrote:

 Hi,

 The current behavior of rdd.unpersist() appears to not be lazily executed
 and therefore must be placed after an action. Is there any way to emulate
 lazy execution of this function so it is added to the task queue?

 Thanks,

 Jem





RE: Making Unpersist Lazy

2015-07-02 Thread Ganelin, Ilya
You may pass an optional parameter (blocking = false) to make it lazy.



Thank you,
Ilya Ganelin



-Original Message-
From: Jem Tucker [jem.tuc...@gmail.commailto:jem.tuc...@gmail.com]
Sent: Thursday, July 02, 2015 04:06 AM Eastern Standard Time
To: Akhil Das
Cc: user
Subject: Re: Making Unpersist Lazy

Hi,

After running some tests it appears the unpersist is called as soon as it is 
reached, so any tasks using this rdd later on will have to re calculate it. 
This is fine for simple programs but when an rdd is created within a function 
and its reference is then lost but children of it continue to be used the 
persist/unpersist does not work effectively

Thanks

Jem
On Thu, 2 Jul 2015 at 08:18, Akhil Das 
ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote:
rdd's which are no longer required will be removed from memory by spark itself 
(which you can consider as lazy?).

Thanks
Best Regards

On Wed, Jul 1, 2015 at 7:48 PM, Jem Tucker 
jem.tuc...@gmail.commailto:jem.tuc...@gmail.com wrote:
Hi,

The current behavior of rdd.unpersist() appears to not be lazily executed and 
therefore must be placed after an action. Is there any way to emulate lazy 
execution of this function so it is added to the task queue?

Thanks,

Jem



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Making Unpersist Lazy

2015-07-01 Thread Jem Tucker
Hi,

The current behavior of rdd.unpersist() appears to not be lazily executed
and therefore must be placed after an action. Is there any way to emulate
lazy execution of this function so it is added to the task queue?

Thanks,

Jem


Re: when cached RDD will unpersist its data

2015-06-23 Thread eric wong
In a case that memory cannot hold all the cached RDD, then BlockManager
will evict some older block for storage of new RDD block.


Hope that will helpful.

2015-06-24 13:22 GMT+08:00 bit1...@163.com bit1...@163.com:

 I am kind of consused about when cached RDD will unpersist its data. I
 know we can explicitly unpersist it with RDD.unpersist ,but can it be
 unpersist automatically by the spark framework?
 Thanks.

 --
 bit1...@163.com




-- 
王海华


when cached RDD will unpersist its data

2015-06-23 Thread bit1...@163.com
I am kind of consused about when cached RDD will unpersist its data. I know we 
can explicitly unpersist it with RDD.unpersist ,but can it be unpersist 
automatically by the spark framework?
Thanks.



bit1...@163.com


Re: Futures timed out during unpersist

2015-01-17 Thread Akhil Das
What is the data size? Have you tried increasing the driver memory??

Thanks
Best Regards

On Sat, Jan 17, 2015 at 1:01 PM, Kevin (Sangwoo) Kim kevin...@apache.org
wrote:

 Hi experts,
 I got an error during unpersist RDD.
 Any ideas?

 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds] at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at
 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107) at
 org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:103)
 at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:951) at
 org.apache.spark.rdd.RDD.unpersist(RDD.scala:168)




Re: Futures timed out during unpersist

2015-01-17 Thread Kevin (Sangwoo) Kim
data size is about 300~400GB, I'm using 800GB cluster and set driver memory
to 50GB.

On Sat Jan 17 2015 at 6:01:46 PM Akhil Das ak...@sigmoidanalytics.com
wrote:

 What is the data size? Have you tried increasing the driver memory??

 Thanks
 Best Regards

 On Sat, Jan 17, 2015 at 1:01 PM, Kevin (Sangwoo) Kim kevin...@apache.org
 wrote:

 Hi experts,
 I got an error during unpersist RDD.
 Any ideas?

 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds] at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at
 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107) at
 org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:103)
 at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:951) at
 org.apache.spark.rdd.RDD.unpersist(RDD.scala:168)





Futures timed out during unpersist

2015-01-16 Thread Kevin (Sangwoo) Kim
Hi experts,
I got an error during unpersist RDD.
Any ideas?

java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107) at
org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:103)
at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:951) at
org.apache.spark.rdd.RDD.unpersist(RDD.scala:168)


Unpersist

2014-09-11 Thread Deep Pradhan
I want to create a temporary variables in a spark code.
Can I do this?

for (i - num)
{
val temp = ..
   {
   do something
   }
temp.unpersist()
}

Thank You


Re: Unpersist

2014-09-11 Thread Akhil Das
like this?

var temp = ...
for (i - num)
{
 temp = ..
   {
   do something
   }
temp.unpersist()
}

Thanks
Best Regards

On Thu, Sep 11, 2014 at 3:26 PM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 I want to create a temporary variables in a spark code.
 Can I do this?

 for (i - num)
 {
 val temp = ..
{
do something
}
 temp.unpersist()
 }

 Thank You



Re: Unpersist

2014-09-11 Thread Deep Pradhan
After every loop I want the temp variable to cease to exist

On Thu, Sep 11, 2014 at 4:33 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 like this?

 var temp = ...
 for (i - num)
 {
  temp = ..
{
do something
}
 temp.unpersist()
 }

 Thanks
 Best Regards

 On Thu, Sep 11, 2014 at 3:26 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 I want to create a temporary variables in a spark code.
 Can I do this?

 for (i - num)
 {
 val temp = ..
{
do something
}
 temp.unpersist()
 }

 Thank You





Regarding function unpersist on rdd

2014-09-02 Thread Zijing Guo
Hello,
Can someone enlighten me regarding whether call unpersist on a rdd is 
expensive?  what is the best solution to uncache the cached rdd?
Thanks
Edwin


RE: Question about RDD cache, unpersist, materialization

2014-06-12 Thread innowireless TaeYun Kim
Maybe It would be nice that unpersist() ‘triggers’ the computations of other 
rdds that depends on it but not yet computed.
The pseudo code can be as follows:

 

unpersist()
{
if (this rdd has not been persisted)
return;
for (all rdds that depends on this rdd but not yet computed)
compute_that_rdd;
do_actual_unpersist();
}



 

From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] 
Sent: Friday, June 13, 2014 5:38 AM
To: user@spark.apache.org
Subject: Re: Question about RDD cache, unpersist, materialization

 

I've run into this issue. The goal of caching / persist seems to be to avoid 
recomputing an RDD when its data will be needed multiple times. However, once 
the following RDDs are computed the cache is no longer needed. The currently 
design provides no obvious way to detect when the cache is no longer needed so 
it can be discarded.

In the case of cache in memory, it may be handled by partitions being dropped 
(in LRU order) when memory fills up. I need to do some more experimentation to 
see if this really works well, or if allowing memory to fill up causes 
performance issues or possibly OOM errors if data isn't correctly freed.

In the case of persisting to disk, I'm not sure if there's a way to limit the 
disk space used for caching. Does anyone know if there is such a configuration 
option? This is a pressing issue for me - I have had jobs fail because nodes 
ran out of disk space.

 

On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com 
wrote:

If you want to force materialization use .count()

 

Also if you can simply don't unpersist anything, unless you really need to free 
the memory 

—
Sent from Mailbox https://www.dropbox.com/mailbox  

 

On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

BTW, it is possible that rdd.first() does not compute the whole partitions. 
So, first() cannot be uses for the situation below. 

-Original Message- 
From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
Sent: Wednesday, June 11, 2014 11:40 AM 
To: user@spark.apache.org 
Subject: Question about RDD cache, unpersist, materialization 

Hi, 

What I (seems to) know about RDD persisting API is as follows: 
- cache() and persist() is not an action. It only does a marking. 
- unpersist() is also not an action. It only removes a marking. But if the 
rdd is already in memory, it is unloaded. 

And there seems no API to forcefully materialize the RDD without requiring a 
data by an action method, for example first(). 

So, I am faced with the following scenario. 

{ 
JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create 
empty for merging 
for (int i = 0; i  10; i++) 
{ 
JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); 
rdd.cache(); // Since it will be used twice, cache. 
rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // 
Transform and save, rdd materializes 
rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another 
transform to T and merge by union 
rdd.unpersist(); // Now it seems not needed. (But needed actually) 
} 
// Here, rddUnion actually materializes, and needs all 10 rdds that 
already unpersisted. 
// So, rebuilding all 10 rdds will occur. 
rddUnion.saveAsTextFile(mergedFileName); 
} 

If rddUnion can be materialized before the rdd.unpersist() line and 
cache()d, the rdds in the loop will not be needed on 
rddUnion.saveAsTextFile(). 

Now what is the best strategy? 
- Do not unpersist all 10 rdds in the loop. 
- Materialize rddUnion in the loop by calling 'light' action API, like 
first(). 
- Give up and just rebuild/reload all 10 rdds when saving rddUnion. 

Is there some misunderstanding? 

Thanks. 



 




-- 

Daniel Siegmann, Software Developer
Velos

Accelerating Machine Learning


440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E:  mailto:daniel.siegm...@velos.io daniel.siegm...@velos.io W:  
http://www.velos.io www.velos.io



Re: Question about RDD cache, unpersist, materialization

2014-06-12 Thread Nicholas Chammas
FYI: Here is a related discussion
http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html
about this.


On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

 Maybe It would be nice that unpersist() ‘triggers’ the computations of
 other rdds that depends on it but not yet computed.
 The pseudo code can be as follows:



 unpersist()
 {
 if (this rdd has not been persisted)
 return;
 for (all rdds that depends on this rdd but not yet computed)
 compute_that_rdd;
 do_actual_unpersist();
 }



 *From:* Daniel Siegmann [mailto:daniel.siegm...@velos.io]
 *Sent:* Friday, June 13, 2014 5:38 AM
 *To:* user@spark.apache.org
 *Subject:* Re: Question about RDD cache, unpersist, materialization



 I've run into this issue. The goal of caching / persist seems to be to
 avoid recomputing an RDD when its data will be needed multiple times.
 However, once the following RDDs are computed the cache is no longer
 needed. The currently design provides no obvious way to detect when the
 cache is no longer needed so it can be discarded.

 In the case of cache in memory, it may be handled by partitions being
 dropped (in LRU order) when memory fills up. I need to do some more
 experimentation to see if this really works well, or if allowing memory to
 fill up causes performance issues or possibly OOM errors if data isn't
 correctly freed.

 In the case of persisting to disk, I'm not sure if there's a way to limit
 the disk space used for caching. Does anyone know if there is such a
 configuration option? This is a pressing issue for me - I have had jobs
 fail because nodes ran out of disk space.



 On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 If you want to force materialization use .count()



 Also if you can simply don't unpersist anything, unless you really need to
 free the memory

 —
 Sent from Mailbox https://www.dropbox.com/mailbox



 On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim 
 taeyun@innowireless.co.kr wrote:

 BTW, it is possible that rdd.first() does not compute the whole
 partitions.
 So, first() cannot be uses for the situation below.

 -Original Message-
 From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr]
 Sent: Wednesday, June 11, 2014 11:40 AM
 To: user@spark.apache.org
 Subject: Question about RDD cache, unpersist, materialization

 Hi,

 What I (seems to) know about RDD persisting API is as follows:
 - cache() and persist() is not an action. It only does a marking.
 - unpersist() is also not an action. It only removes a marking. But if the
 rdd is already in memory, it is unloaded.

 And there seems no API to forcefully materialize the RDD without requiring
 a
 data by an action method, for example first().

 So, I am faced with the following scenario.

 {
 JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create
 empty for merging
 for (int i = 0; i  10; i++)
 {
 JavaRDDT2 rdd = sc.textFile(inputFileNames[i]);
 rdd.cache(); // Since it will be used twice, cache.
 rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); //
 Transform and save, rdd materializes
 rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another
 transform to T and merge by union
 rdd.unpersist(); // Now it seems not needed. (But needed actually)
 }
 // Here, rddUnion actually materializes, and needs all 10 rdds that
 already unpersisted.
 // So, rebuilding all 10 rdds will occur.
 rddUnion.saveAsTextFile(mergedFileName);
 }

 If rddUnion can be materialized before the rdd.unpersist() line and
 cache()d, the rdds in the loop will not be needed on
 rddUnion.saveAsTextFile().

 Now what is the best strategy?
 - Do not unpersist all 10 rdds in the loop.
 - Materialize rddUnion in the loop by calling 'light' action API, like
 first().
 - Give up and just rebuild/reload all 10 rdds when saving rddUnion.

 Is there some misunderstanding?

 Thanks.






 --

 Daniel Siegmann, Software Developer
 Velos

 Accelerating Machine Learning


 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io



RE: Question about RDD cache, unpersist, materialization

2014-06-12 Thread innowireless TaeYun Kim
Currently I use rdd.count() for forceful computation, as Nick Pentreath 
suggested.

 

I think that it will be nice to have a method that forcefully computes a rdd, 
so that the unnecessary rdds are safely unpersist()ed.

 

Let’s think a case that a rdd_a is a parent of both:

(1) a short-term rdd_s that depends only on the rdd (transformation of rdd_a)

(2) and a long-term rdd_t that depends on the rdds that may be computed later. 
(may be for some aggregation)

Usually rdd_s is computed early, and rdd_a is computed for it. But it has to 
remain in memory until rdd_t is eventually computed. (Or recomputed when rdd_t 
is computed)

It would be nice if rdd_t could be computed when rdd_s is computed, so that 
rdd_a can be unpersist()ed, since it will not be used anymore.

(Currently I use rdd_t.count() for that)

 

sc.prune() which was suggested in the related discussion you provided is not 
helpful for this case, since rdd_a is ‘referenced’ by ‘lazy’ rdd_t.

(Get up rdd_t, and do it with rdd_s while rdd_a is here for rdd_s.)

 

 

From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com] 
Sent: Friday, June 13, 2014 9:31 AM
To: user
Subject: Re: Question about RDD cache, unpersist, materialization

 

FYI: Here is a related discussion 
http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html
  about this.

 

On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

Maybe It would be nice that unpersist() ¡®triggers¡¯ the computations of other 
rdds that depends on it but not yet computed.
The pseudo code can be as follows:

 

unpersist()
{
if (this rdd has not been persisted)
return;
for (all rdds that depends on this rdd but not yet computed)
compute_that_rdd;
do_actual_unpersist();
}

 

From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] 
Sent: Friday, June 13, 2014 5:38 AM
To: user@spark.apache.org
Subject: Re: Question about RDD cache, unpersist, materialization

 

I've run into this issue. The goal of caching / persist seems to be to avoid 
recomputing an RDD when its data will be needed multiple times. However, once 
the following RDDs are computed the cache is no longer needed. The currently 
design provides no obvious way to detect when the cache is no longer needed so 
it can be discarded.

In the case of cache in memory, it may be handled by partitions being dropped 
(in LRU order) when memory fills up. I need to do some more experimentation to 
see if this really works well, or if allowing memory to fill up causes 
performance issues or possibly OOM errors if data isn't correctly freed.

In the case of persisting to disk, I'm not sure if there's a way to limit the 
disk space used for caching. Does anyone know if there is such a configuration 
option? This is a pressing issue for me - I have had jobs fail because nodes 
ran out of disk space.

 

On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com 
wrote:

If you want to force materialization use .count()

 

Also if you can simply don't unpersist anything, unless you really need to free 
the memory 

—
Sent from Mailbox https://www.dropbox.com/mailbox  

 

On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

BTW, it is possible that rdd.first() does not compute the whole partitions. 
So, first() cannot be uses for the situation below. 

-Original Message- 
From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
Sent: Wednesday, June 11, 2014 11:40 AM 
To: user@spark.apache.org 
Subject: Question about RDD cache, unpersist, materialization 

Hi, 

What I (seems to) know about RDD persisting API is as follows: 
- cache() and persist() is not an action. It only does a marking. 
- unpersist() is also not an action. It only removes a marking. But if the 
rdd is already in memory, it is unloaded. 

And there seems no API to forcefully materialize the RDD without requiring a 
data by an action method, for example first(). 

So, I am faced with the following scenario. 

{ 
JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create 
empty for merging 
for (int i = 0; i  10; i++) 
{ 
JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); 
rdd.cache(); // Since it will be used twice, cache. 
rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // 
Transform and save, rdd materializes 
rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another 
transform to T and merge by union 
rdd.unpersist(); // Now it seems not needed. (But needed actually) 
} 
// Here, rddUnion actually materializes, and needs all 10 rdds that 
already unpersisted. 
// So, rebuilding all 10 rdds will occur. 
rddUnion.saveAsTextFile(mergedFileName); 
} 

If rddUnion can be materialized before the rdd.unpersist() line and 
cache()d, the rdds in the loop will not be needed on 
rddUnion.saveAsTextFile(). 

Now what is the best strategy? 
- Do not unpersist all 10 rdds in the loop

RE: Question about RDD cache, unpersist, materialization

2014-06-12 Thread innowireless TaeYun Kim
(I¡¯ve clarified the statement (1) of my previous mail. See below.)

 

From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
Sent: Friday, June 13, 2014 10:05 AM
To: user@spark.apache.org
Subject: RE: Question about RDD cache, unpersist, materialization

 

Currently I use rdd.count() for forceful computation, as Nick Pentreath 
suggested.

 

I think that it will be nice to have a method that forcefully computes a rdd, 
so that the unnecessary rdds are safely unpersist()ed.

 

Let’s think a case that a rdd_a is a parent of both:

(1) a short-term rdd_s that depends only on rdd_a (maybe rdd_s is a 
transformation of rdd_a)

(2) and a long-term rdd_t that depends on the rdds that may be computed later. 
(may be for some aggregation)

Usually rdd_s is computed early, and rdd_a is computed for it. But it has to 
remain in memory until rdd_t is eventually computed. (Or recomputed when rdd_t 
is computed)

It would be nice if rdd_t could be computed when rdd_s is computed, so that 
rdd_a can be unpersist()ed, since it will not be used anymore.

(Currently I use rdd_t.count() for that)

 

sc.prune() which was suggested in the related discussion you provided is not 
helpful for this case, since rdd_a is ‘referenced’ by ‘lazy’ rdd_t.

(Get up rdd_t, and do it with rdd_s while rdd_a is here for rdd_s.)

 

 

From: Nicholas Chammas [mailto:nicholas.cham...@gmail.com] 
Sent: Friday, June 13, 2014 9:31 AM
To: user
Subject: Re: Question about RDD cache, unpersist, materialization

 

FYI: Here is a related discussion 
http://apache-spark-user-list.1001560.n3.nabble.com/Persist-and-unpersist-td6437.html
  about this.

 

On Thu, Jun 12, 2014 at 8:10 PM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

Maybe It would be nice that unpersist() ¡®triggers¡¯ the computations of other 
rdds that depends on it but not yet computed.
The pseudo code can be as follows:

 

unpersist()
{
if (this rdd has not been persisted)
return;
for (all rdds that depends on this rdd but not yet computed)
compute_that_rdd;
do_actual_unpersist();
}

 

From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] 
Sent: Friday, June 13, 2014 5:38 AM
To: user@spark.apache.org
Subject: Re: Question about RDD cache, unpersist, materialization

 

I've run into this issue. The goal of caching / persist seems to be to avoid 
recomputing an RDD when its data will be needed multiple times. However, once 
the following RDDs are computed the cache is no longer needed. The currently 
design provides no obvious way to detect when the cache is no longer needed so 
it can be discarded.

In the case of cache in memory, it may be handled by partitions being dropped 
(in LRU order) when memory fills up. I need to do some more experimentation to 
see if this really works well, or if allowing memory to fill up causes 
performance issues or possibly OOM errors if data isn't correctly freed.

In the case of persisting to disk, I'm not sure if there's a way to limit the 
disk space used for caching. Does anyone know if there is such a configuration 
option? This is a pressing issue for me - I have had jobs fail because nodes 
ran out of disk space.

 

On Wed, Jun 11, 2014 at 2:26 AM, Nick Pentreath nick.pentre...@gmail.com 
wrote:

If you want to force materialization use .count()

 

Also if you can simply don't unpersist anything, unless you really need to free 
the memory 

—
Sent from Mailbox https://www.dropbox.com/mailbox  

 

On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim 
taeyun@innowireless.co.kr wrote:

BTW, it is possible that rdd.first() does not compute the whole partitions. 
So, first() cannot be uses for the situation below. 

-Original Message- 
From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
Sent: Wednesday, June 11, 2014 11:40 AM 
To: user@spark.apache.org 
Subject: Question about RDD cache, unpersist, materialization 

Hi, 

What I (seems to) know about RDD persisting API is as follows: 
- cache() and persist() is not an action. It only does a marking. 
- unpersist() is also not an action. It only removes a marking. But if the 
rdd is already in memory, it is unloaded. 

And there seems no API to forcefully materialize the RDD without requiring a 
data by an action method, for example first(). 

So, I am faced with the following scenario. 

{ 
JavaRDDT rddUnion = sc.parallelize(new ArrayListT()); // create 
empty for merging 
for (int i = 0; i  10; i++) 
{ 
JavaRDDT2 rdd = sc.textFile(inputFileNames[i]); 
rdd.cache(); // Since it will be used twice, cache. 
rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]); // 
Transform and save, rdd materializes 
rddUnion = rddUnion.union(rdd.map(...).filter(...)); // Do another 
transform to T and merge by union 
rdd.unpersist(); // Now it seems not needed. (But needed actually) 
} 
// Here, rddUnion actually materializes, and needs all 10 rdds that 
already unpersisted. 
// So, rebuilding all 10

RE: Question about RDD cache, unpersist, materialization

2014-06-11 Thread Nick Pentreath
If you want to force materialization use .count()


Also if you can simply don't unpersist anything, unless you really need to free 
the memory 
—
Sent from Mailbox

On Wed, Jun 11, 2014 at 5:13 AM, innowireless TaeYun Kim
taeyun@innowireless.co.kr wrote:

 BTW, it is possible that rdd.first() does not compute the whole partitions.
 So, first() cannot be uses for the situation below.
 -Original Message-
 From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
 Sent: Wednesday, June 11, 2014 11:40 AM
 To: user@spark.apache.org
 Subject: Question about RDD cache, unpersist, materialization
 Hi,
 What I (seems to) know about RDD persisting API is as follows:
 - cache() and persist() is not an action. It only does a marking.
 - unpersist() is also not an action. It only removes a marking. But if the
 rdd is already in memory, it is unloaded.
 And there seems no API to forcefully materialize the RDD without requiring a
 data by an action method, for example first().
 So, I am faced with the following scenario.
 {
 JavaRDDT rddUnion = sc.parallelize(new ArrayListT());  // create
 empty for merging
 for (int i = 0; i  10; i++)
 {
 JavaRDDT2 rdd = sc.textFile(inputFileNames[i]);
 rdd.cache();  // Since it will be used twice, cache.
 rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]);  //
 Transform and save, rdd materializes
 rddUnion = rddUnion.union(rdd.map(...).filter(...));  // Do another
 transform to T and merge by union
 rdd.unpersist();  // Now it seems not needed. (But needed actually)
 }
 // Here, rddUnion actually materializes, and needs all 10 rdds that
 already unpersisted.
 // So, rebuilding all 10 rdds will occur.
 rddUnion.saveAsTextFile(mergedFileName);
 }
 If rddUnion can be materialized before the rdd.unpersist() line and
 cache()d, the rdds in the loop will not be needed on
 rddUnion.saveAsTextFile().
 Now what is the best strategy?
 - Do not unpersist all 10 rdds in the loop.
 - Materialize rddUnion in the loop by calling 'light' action API, like
 first().
 - Give up and just rebuild/reload all 10 rdds when saving rddUnion.
 Is there some misunderstanding?
 Thanks.

Question about RDD cache, unpersist, materialization

2014-06-10 Thread innowireless TaeYun Kim
Hi,

What I (seems to) know about RDD persisting API is as follows:
- cache() and persist() is not an action. It only does a marking.
- unpersist() is also not an action. It only removes a marking. But if the
rdd is already in memory, it is unloaded.

And there seems no API to forcefully materialize the RDD without requiring a
data by an action method, for example first().

So, I am faced with the following scenario.

{
JavaRDDT rddUnion = sc.parallelize(new ArrayListT());  // create
empty for merging
for (int i = 0; i  10; i++)
{
JavaRDDT2 rdd = sc.textFile(inputFileNames[i]);
rdd.cache();  // Since it will be used twice, cache.
rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]);  //
Transform and save, rdd materializes
rddUnion = rddUnion.union(rdd.map(...).filter(...));  // Do another
transform to T and merge by union
rdd.unpersist();  // Now it seems not needed. (But needed actually)
}
// Here, rddUnion actually materializes, and needs all 10 rdds that
already unpersisted.
// So, rebuilding all 10 rdds will occur.
rddUnion.saveAsTextFile(mergedFileName);
}

If rddUnion can be materialized before the rdd.unpersist() line and
cache()d, the rdds in the loop will not be needed on
rddUnion.saveAsTextFile().

Now what is the best strategy?
- Do not unpersist all 10 rdds in the loop.
- Materialize rddUnion in the loop by calling 'light' action API, like
first().
- Give up and just rebuild/reload all 10 rdds when saving rddUnion.

Is there some misunderstanding?

Thanks.




RE: Question about RDD cache, unpersist, materialization

2014-06-10 Thread innowireless TaeYun Kim
BTW, it is possible that rdd.first() does not compute the whole partitions.
So, first() cannot be uses for the situation below.

-Original Message-
From: innowireless TaeYun Kim [mailto:taeyun@innowireless.co.kr] 
Sent: Wednesday, June 11, 2014 11:40 AM
To: user@spark.apache.org
Subject: Question about RDD cache, unpersist, materialization

Hi,

What I (seems to) know about RDD persisting API is as follows:
- cache() and persist() is not an action. It only does a marking.
- unpersist() is also not an action. It only removes a marking. But if the
rdd is already in memory, it is unloaded.

And there seems no API to forcefully materialize the RDD without requiring a
data by an action method, for example first().

So, I am faced with the following scenario.

{
JavaRDDT rddUnion = sc.parallelize(new ArrayListT());  // create
empty for merging
for (int i = 0; i  10; i++)
{
JavaRDDT2 rdd = sc.textFile(inputFileNames[i]);
rdd.cache();  // Since it will be used twice, cache.
rdd.map(...).filter(...).saveAsTextFile(outputFileNames[i]);  //
Transform and save, rdd materializes
rddUnion = rddUnion.union(rdd.map(...).filter(...));  // Do another
transform to T and merge by union
rdd.unpersist();  // Now it seems not needed. (But needed actually)
}
// Here, rddUnion actually materializes, and needs all 10 rdds that
already unpersisted.
// So, rebuilding all 10 rdds will occur.
rddUnion.saveAsTextFile(mergedFileName);
}

If rddUnion can be materialized before the rdd.unpersist() line and
cache()d, the rdds in the loop will not be needed on
rddUnion.saveAsTextFile().

Now what is the best strategy?
- Do not unpersist all 10 rdds in the loop.
- Materialize rddUnion in the loop by calling 'light' action API, like
first().
- Give up and just rebuild/reload all 10 rdds when saving rddUnion.

Is there some misunderstanding?

Thanks.




Persist and unpersist

2014-05-27 Thread Daniel Darabos
I keep bumping into a problem with persisting RDDs. Consider this (silly)
example:

def everySecondFromBehind(input: RDD[Int]): RDD[Int] = {
  val count = input.count
  if (count % 2 == 0) {
return input.filter(_ % 2 == 1)
  } else {
return input.filter(_ % 2 == 0)
  }
}


The situation is that we want to do two things with an RDD (a count and a
filter in the example). The input RDD may represent a very expensive
calculation. So it would make sense to add an input.cache() line at the
beginning. But where do we put input.unpersist()?

input.cache()val count = input.countval result = input.filter(...)
input.unpersist()return result


input.filter() is lazy, so this does not work as expected. We only want
to release input from the cache once nothing depends on it anymore. Maybe
result was garbage collected. Maybe result itself has been cached. But
there is no way to detect such conditions.

Our current approach is to just leave the RDD cached, and it will get
dumped at some point anyway. Is there a better solution? Thanks for any
tips.


Re: Persist and unpersist

2014-05-27 Thread Nicholas Chammas
Daniel,

Is SPARK-1103 https://issues.apache.org/jira/browse/SPARK-1103 related to
your example? Automatic unpersist()-ing of unreferenced RDDs would be nice.

Nick
​


On Tue, May 27, 2014 at 12:28 PM, Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 I keep bumping into a problem with persisting RDDs. Consider this (silly)
 example:

 def everySecondFromBehind(input: RDD[Int]): RDD[Int] = {
   val count = input.count
   if (count % 2 == 0) {
 return input.filter(_ % 2 == 1)
   } else {
 return input.filter(_ % 2 == 0)
   }
 }


 The situation is that we want to do two things with an RDD (a count and
 a filter in the example). The input RDD may represent a very expensive
 calculation. So it would make sense to add an input.cache() line at the
 beginning. But where do we put input.unpersist()?

 input.cache()val count = input.countval result = input.filter(...)
 input.unpersist()return result


 input.filter() is lazy, so this does not work as expected. We only want
 to release input from the cache once nothing depends on it anymore. Maybe
 result was garbage collected. Maybe result itself has been cached. But
 there is no way to detect such conditions.

 Our current approach is to just leave the RDD cached, and it will get
 dumped at some point anyway. Is there a better solution? Thanks for any
 tips.



Re: Persist and unpersist

2014-05-27 Thread Ankur Dave
I think what's desired here is for input to be unpersisted automatically as
soon as result is materialized. I don't think there's currently a way to do
this, but the usual workaround is to force result to be materialized
immediately and then unpersist input:

input.cache()val count = input.countval result = input.filter(...)
result.cache().foreach(x = {}) // materialize resultinput.unpersist()
// safe because `result` is materialized  // and is
the only RDD that depends on `input`return result


Ankur http://www.ankurdave.com/