Re: How can i remove the need for calling cache

2017-08-02 Thread jeff saremi
thanks Vadim. yes this is a good option for us. thanks


From: Vadim Semenov <vadim.seme...@datadoghq.com>
Sent: Wednesday, August 2, 2017 6:24:40 PM
To: Suzen, Mehmet
Cc: jeff saremi; user@spark.apache.org
Subject: Re: How can i remove the need for calling cache

So if you just save an RDD to HDFS via 'saveAsSequenceFile', you would have to 
create a new RDD that reads that data, this way you'll avoid recomputing the 
RDD but may lose time on saving/loading.

Exactly same thing happens in 'checkpoint', 'checkpoint' is just a convenient 
method that gives you the same RDD back, basically.

However, if your job fails, there's no way to run a new job using already 
'checkpointed' data from a previous failed run. That's where having a custom 
check pointer helps.

Another note: you can not delete "checkpoint"ed data in the same job, you need 
to delete it somehow else.

BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to local 
disk, making more space in JVM and letting you to avoid hdfs.

On Wednesday, August 2, 2017, Vadim Semenov 
<vadim.seme...@datadoghq.com<mailto:vadim.seme...@datadoghq.com>> wrote:
`saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it 
just saves data to some destination.

`cache/persist` allow you to cache data and keep the DAG in case of some 
executor that holds data goes down, so Spark would still be able to recalculate 
missing partitions

`localCheckpoint` allows you to sacrifice fault-tolerance and truncate the DAG, 
so if some executor goes down, the job will fail, because it has already 
forgotten the DAG. 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1551-L1610

and `checkpoint` allows you to save data to some shared storage and truncate 
the DAG, so if an executor goes down, the job will be able to take missing 
partitions from the place where it saved the RDD
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549

On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet 
<su...@acm.org<javascript:_e(%7B%7D,'cvml','su...@acm.org');>> wrote:
On 3 August 2017 at 01:05, jeff saremi 
<jeffsar...@hotmail.com<javascript:_e(%7B%7D,'cvml','jeffsar...@hotmail.com');>>
 wrote:
> Vadim:
>
> This is from the Mastering Spark book:
>
> "It is strongly recommended that a checkpointed RDD is persisted in memory,
> otherwise saving it on a file will require recomputation."

Is this really true? I had the impression that DAG will not be carried
out once RDD is serialized to an external file, so 'saveAsObjectFile'
saves DAG as well?



Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
So if you just save an RDD to HDFS via 'saveAsSequenceFile', you would have
to create a new RDD that reads that data, this way you'll avoid recomputing
the RDD but may lose time on saving/loading.

Exactly same thing happens in 'checkpoint', 'checkpoint' is just a
convenient method that gives you the same RDD back, basically.

However, if your job fails, there's no way to run a new job using already
'checkpointed' data from a previous failed run. That's where having a
custom check pointer helps.

Another note: you can not delete "checkpoint"ed data in the same job, you
need to delete it somehow else.

BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to
local disk, making more space in JVM and letting you to avoid hdfs.

On Wednesday, August 2, 2017, Vadim Semenov 
wrote:

> `saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so
> it just saves data to some destination.
>
> `cache/persist` allow you to cache data and keep the DAG in case of some
> executor that holds data goes down, so Spark would still be able to
> recalculate missing partitions
>
> `localCheckpoint` allows you to sacrifice fault-tolerance and truncate the
> DAG, so if some executor goes down, the job will fail, because it has
> already forgotten the DAG. https://github.com/apache/
> spark/blob/master/core/src/main/scala/org/apache/spark/
> rdd/RDD.scala#L1551-L1610
>
> and `checkpoint` allows you to save data to some shared storage and
> truncate the DAG, so if an executor goes down, the job will be able to take
> missing partitions from the place where it saved the RDD
> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549
>
> On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet  > wrote:
>
>> On 3 August 2017 at 01:05, jeff saremi > > wrote:
>> > Vadim:
>> >
>> > This is from the Mastering Spark book:
>> >
>> > "It is strongly recommended that a checkpointed RDD is persisted in
>> memory,
>> > otherwise saving it on a file will require recomputation."
>>
>> Is this really true? I had the impression that DAG will not be carried
>> out once RDD is serialized to an external file, so 'saveAsObjectFile'
>> saves DAG as well?
>>
>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Suzen, Mehmet
On 3 August 2017 at 03:00, Vadim Semenov  wrote:
> `saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it
> just saves data to some destination.

Yes, that's what I thought, so the statement "..otherwise saving it on
a file will require recomputation."  from the book is not entirely
true.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
`saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it
just saves data to some destination.

`cache/persist` allow you to cache data and keep the DAG in case of some
executor that holds data goes down, so Spark would still be able to
recalculate missing partitions

`localCheckpoint` allows you to sacrifice fault-tolerance and truncate the
DAG, so if some executor goes down, the job will fail, because it has
already forgotten the DAG.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1551-L1610

and `checkpoint` allows you to save data to some shared storage and
truncate the DAG, so if an executor goes down, the job will be able to take
missing partitions from the place where it saved the RDD
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549

On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet  wrote:

> On 3 August 2017 at 01:05, jeff saremi  wrote:
> > Vadim:
> >
> > This is from the Mastering Spark book:
> >
> > "It is strongly recommended that a checkpointed RDD is persisted in
> memory,
> > otherwise saving it on a file will require recomputation."
>
> Is this really true? I had the impression that DAG will not be carried
> out once RDD is serialized to an external file, so 'saveAsObjectFile'
> saves DAG as well?
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
Also check the `RDD.checkpoint()` method

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1550

On Wed, Aug 2, 2017 at 8:46 PM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> I'm not sure that "checkpointed" means the same thing in that sentence.
>
> You can run a simple test using `spark-shell`:
>
> sc.setCheckpointDir("/tmp/checkpoint")
> val rdd = sc.parallelize(1 to 10).map(x => {
>   Thread.sleep(1000)
>   x
> })
> rdd.checkpoint()
> rdd.foreach(println) // Will take 10 seconds
> rdd.foreach(println) // Will be instant, because the RDD is checkpointed
>
> On Wed, Aug 2, 2017 at 7:05 PM, jeff saremi <jeffsar...@hotmail.com>
> wrote:
>
>> Vadim:
>>
>> This is from the Mastering Spark book:
>>
>> *"It is strongly recommended that a checkpointed RDD is persisted in
>> memory, otherwise saving it on a file will require recomputation."*
>>
>>
>> To me that means checkpoint will not prevent the recomputation that i was
>> hoping for
>> --
>> *From:* Vadim Semenov <vadim.seme...@datadoghq.com>
>> *Sent:* Tuesday, August 1, 2017 12:05:17 PM
>> *To:* jeff saremi
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: How can i remove the need for calling cache
>>
>> You can use `.checkpoint()`:
>> ```
>> val sc: SparkContext
>> sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
>> myrdd.checkpoint()
>> val result1 = myrdd.map(op1(_))
>> result1.count() // Will save `myrdd` to HDFS and do map(op1…
>> val result2 = myrdd.map(op2(_))
>> result2.count() // Will load `myrdd` from HDFS and do map(op2…
>> ```
>>
>> On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi <jeffsar...@hotmail.com>
>> wrote:
>>
>>> Calling cache/persist fails all our jobs (i have  posted 2 threads on
>>> this).
>>>
>>> And we're giving up hope in finding a solution.
>>> So I'd like to find a workaround for that:
>>>
>>> If I save an RDD to hdfs and read it back, can I use it in more than one
>>> operation?
>>>
>>> Example: (using cache)
>>> // do a whole bunch of transformations on an RDD
>>>
>>> myrdd.cache()
>>>
>>> val result1 = myrdd.map(op1(_))
>>>
>>> val result2 = myrdd.map(op2(_))
>>>
>>> // in the above I am assuming that a call to cache will prevent all
>>> previous transformation from being calculated twice
>>>
>>> I'd like to somehow get result1 and result2 without duplicating work.
>>> How can I do that?
>>>
>>> thanks
>>>
>>> Jeff
>>>
>>
>>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
I'm not sure that "checkpointed" means the same thing in that sentence.

You can run a simple test using `spark-shell`:

sc.setCheckpointDir("/tmp/checkpoint")
val rdd = sc.parallelize(1 to 10).map(x => {
  Thread.sleep(1000)
  x
})
rdd.checkpoint()
rdd.foreach(println) // Will take 10 seconds
rdd.foreach(println) // Will be instant, because the RDD is checkpointed

On Wed, Aug 2, 2017 at 7:05 PM, jeff saremi <jeffsar...@hotmail.com> wrote:

> Vadim:
>
> This is from the Mastering Spark book:
>
> *"It is strongly recommended that a checkpointed RDD is persisted in
> memory, otherwise saving it on a file will require recomputation."*
>
>
> To me that means checkpoint will not prevent the recomputation that i was
> hoping for
> --
> *From:* Vadim Semenov <vadim.seme...@datadoghq.com>
> *Sent:* Tuesday, August 1, 2017 12:05:17 PM
> *To:* jeff saremi
> *Cc:* user@spark.apache.org
> *Subject:* Re: How can i remove the need for calling cache
>
> You can use `.checkpoint()`:
> ```
> val sc: SparkContext
> sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
> myrdd.checkpoint()
> val result1 = myrdd.map(op1(_))
> result1.count() // Will save `myrdd` to HDFS and do map(op1…
> val result2 = myrdd.map(op2(_))
> result2.count() // Will load `myrdd` from HDFS and do map(op2…
> ```
>
> On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi <jeffsar...@hotmail.com>
> wrote:
>
>> Calling cache/persist fails all our jobs (i have  posted 2 threads on
>> this).
>>
>> And we're giving up hope in finding a solution.
>> So I'd like to find a workaround for that:
>>
>> If I save an RDD to hdfs and read it back, can I use it in more than one
>> operation?
>>
>> Example: (using cache)
>> // do a whole bunch of transformations on an RDD
>>
>> myrdd.cache()
>>
>> val result1 = myrdd.map(op1(_))
>>
>> val result2 = myrdd.map(op2(_))
>>
>> // in the above I am assuming that a call to cache will prevent all
>> previous transformation from being calculated twice
>>
>> I'd like to somehow get result1 and result2 without duplicating work. How
>> can I do that?
>>
>> thanks
>>
>> Jeff
>>
>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Suzen, Mehmet
On 3 August 2017 at 01:05, jeff saremi  wrote:
> Vadim:
>
> This is from the Mastering Spark book:
>
> "It is strongly recommended that a checkpointed RDD is persisted in memory,
> otherwise saving it on a file will require recomputation."

Is this really true? I had the impression that DAG will not be carried
out once RDD is serialized to an external file, so 'saveAsObjectFile'
saves DAG as well?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How can i remove the need for calling cache

2017-08-02 Thread jeff saremi
Vadim:

This is from the Mastering Spark book:

"It is strongly recommended that a checkpointed RDD is persisted in memory, 
otherwise saving it on a file will require recomputation."


To me that means checkpoint will not prevent the recomputation that i was 
hoping for


From: Vadim Semenov <vadim.seme...@datadoghq.com>
Sent: Tuesday, August 1, 2017 12:05:17 PM
To: jeff saremi
Cc: user@spark.apache.org
Subject: Re: How can i remove the need for calling cache

You can use `.checkpoint()`:
```
val sc: SparkContext
sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
myrdd.checkpoint()
val result1 = myrdd.map(op1(_))
result1.count() // Will save `myrdd` to HDFS and do map(op1…
val result2 = myrdd.map(op2(_))
result2.count() // Will load `myrdd` from HDFS and do map(op2…
```

On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi 
<jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote:

Calling cache/persist fails all our jobs (i have  posted 2 threads on this).

And we're giving up hope in finding a solution.
So I'd like to find a workaround for that:

If I save an RDD to hdfs and read it back, can I use it in more than one 
operation?

Example: (using cache)
// do a whole bunch of transformations on an RDD

myrdd.cache()

val result1 = myrdd.map(op1(_))

val result2 = myrdd.map(op2(_))

// in the above I am assuming that a call to cache will prevent all previous 
transformation from being calculated twice


I'd like to somehow get result1 and result2 without duplicating work. How can I 
do that?

thanks

Jeff



Re: How can i remove the need for calling cache

2017-08-01 Thread jeff saremi
Thanks Mark. I'll examine the status more carefully to observe this.


From: Mark Hamstra <m...@clearstorydata.com>
Sent: Tuesday, August 1, 2017 11:25:46 AM
To: user@spark.apache.org
Subject: Re: How can i remove the need for calling cache

Very likely, much of the potential duplication is already being avoided even 
without calling cache/persist. When running the above code without 
`myrdd.cache`, have you looked at the Spark web UI for the Jobs? For at least 
one of them you will likely see that many Stages are marked as "skipped", which 
means that prior shuffle files that cover the results of those Stages were 
still available, so Spark did not recompute those results. Spark will 
eventually clean up those shuffle files (unless you hold onto a reference to 
them), but if your Jobs using myrdd run fairly close together in time, then 
duplication is already minimized even without an explicit cache call.

On Tue, Aug 1, 2017 at 11:05 AM, jeff saremi 
<jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote:

Calling cache/persist fails all our jobs (i have  posted 2 threads on this).

And we're giving up hope in finding a solution.
So I'd like to find a workaround for that:

If I save an RDD to hdfs and read it back, can I use it in more than one 
operation?

Example: (using cache)
// do a whole bunch of transformations on an RDD

myrdd.cache()

val result1 = myrdd.map(op1(_))

val result2 = myrdd.map(op2(_))

// in the above I am assuming that a call to cache will prevent all previous 
transformation from being calculated twice


I'd like to somehow get result1 and result2 without duplicating work. How can I 
do that?

thanks

Jeff



Re: How can i remove the need for calling cache

2017-08-01 Thread jeff saremi
Thanks Vadim. I'll try that


From: Vadim Semenov <vadim.seme...@datadoghq.com>
Sent: Tuesday, August 1, 2017 12:05:17 PM
To: jeff saremi
Cc: user@spark.apache.org
Subject: Re: How can i remove the need for calling cache

You can use `.checkpoint()`:
```
val sc: SparkContext
sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
myrdd.checkpoint()
val result1 = myrdd.map(op1(_))
result1.count() // Will save `myrdd` to HDFS and do map(op1…
val result2 = myrdd.map(op2(_))
result2.count() // Will load `myrdd` from HDFS and do map(op2…
```

On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi 
<jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote:

Calling cache/persist fails all our jobs (i have  posted 2 threads on this).

And we're giving up hope in finding a solution.
So I'd like to find a workaround for that:

If I save an RDD to hdfs and read it back, can I use it in more than one 
operation?

Example: (using cache)
// do a whole bunch of transformations on an RDD

myrdd.cache()

val result1 = myrdd.map(op1(_))

val result2 = myrdd.map(op2(_))

// in the above I am assuming that a call to cache will prevent all previous 
transformation from being calculated twice


I'd like to somehow get result1 and result2 without duplicating work. How can I 
do that?

thanks

Jeff



Re: How can i remove the need for calling cache

2017-08-01 Thread Vadim Semenov
You can use `.checkpoint()`:
```
val sc: SparkContext
sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
myrdd.checkpoint()
val result1 = myrdd.map(op1(_))
result1.count() // Will save `myrdd` to HDFS and do map(op1…
val result2 = myrdd.map(op2(_))
result2.count() // Will load `myrdd` from HDFS and do map(op2…
```

On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi  wrote:

> Calling cache/persist fails all our jobs (i have  posted 2 threads on
> this).
>
> And we're giving up hope in finding a solution.
> So I'd like to find a workaround for that:
>
> If I save an RDD to hdfs and read it back, can I use it in more than one
> operation?
>
> Example: (using cache)
> // do a whole bunch of transformations on an RDD
>
> myrdd.cache()
>
> val result1 = myrdd.map(op1(_))
>
> val result2 = myrdd.map(op2(_))
>
> // in the above I am assuming that a call to cache will prevent all
> previous transformation from being calculated twice
>
> I'd like to somehow get result1 and result2 without duplicating work. How
> can I do that?
>
> thanks
>
> Jeff
>


Re: How can i remove the need for calling cache

2017-08-01 Thread jeff saremi
here are the threads that talk about problems we're experiencing. These 
problems exacerbate when we use cache/persist

https://www.mail-archive.com/user@spark.apache.org/msg64987.html
https://www.mail-archive.com/user@spark.apache.org/msg64986.html

So I am looking for a way to reproduce the same effect as in my sample code 
without the use of cache().

If I use myrdd.count() would that be a good alternative?
thanks


From: lucas.g...@gmail.com <lucas.g...@gmail.com>
Sent: Tuesday, August 1, 2017 11:23:04 AM
To: jeff saremi
Cc: user@spark.apache.org
Subject: Re: How can i remove the need for calling cache

Hi Jeff, that looks sane to me.  Do you have additional details?

On 1 August 2017 at 11:05, jeff saremi 
<jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote:

Calling cache/persist fails all our jobs (i have  posted 2 threads on this).

And we're giving up hope in finding a solution.
So I'd like to find a workaround for that:

If I save an RDD to hdfs and read it back, can I use it in more than one 
operation?

Example: (using cache)
// do a whole bunch of transformations on an RDD

myrdd.cache()

val result1 = myrdd.map(op1(_))

val result2 = myrdd.map(op2(_))

// in the above I am assuming that a call to cache will prevent all previous 
transformation from being calculated twice


I'd like to somehow get result1 and result2 without duplicating work. How can I 
do that?

thanks

Jeff



Re: How can i remove the need for calling cache

2017-08-01 Thread Mark Hamstra
Very likely, much of the potential duplication is already being avoided
even without calling cache/persist. When running the above code without
`myrdd.cache`, have you looked at the Spark web UI for the Jobs? For at
least one of them you will likely see that many Stages are marked as
"skipped", which means that prior shuffle files that cover the results of
those Stages were still available, so Spark did not recompute those
results. Spark will eventually clean up those shuffle files (unless you
hold onto a reference to them), but if your Jobs using myrdd run fairly
close together in time, then duplication is already minimized even without
an explicit cache call.

On Tue, Aug 1, 2017 at 11:05 AM, jeff saremi  wrote:

> Calling cache/persist fails all our jobs (i have  posted 2 threads on
> this).
>
> And we're giving up hope in finding a solution.
> So I'd like to find a workaround for that:
>
> If I save an RDD to hdfs and read it back, can I use it in more than one
> operation?
>
> Example: (using cache)
> // do a whole bunch of transformations on an RDD
>
> myrdd.cache()
>
> val result1 = myrdd.map(op1(_))
>
> val result2 = myrdd.map(op2(_))
>
> // in the above I am assuming that a call to cache will prevent all
> previous transformation from being calculated twice
>
> I'd like to somehow get result1 and result2 without duplicating work. How
> can I do that?
>
> thanks
>
> Jeff
>


Re: How can i remove the need for calling cache

2017-08-01 Thread lucas.g...@gmail.com
Hi Jeff, that looks sane to me.  Do you have additional details?

On 1 August 2017 at 11:05, jeff saremi  wrote:

> Calling cache/persist fails all our jobs (i have  posted 2 threads on
> this).
>
> And we're giving up hope in finding a solution.
> So I'd like to find a workaround for that:
>
> If I save an RDD to hdfs and read it back, can I use it in more than one
> operation?
>
> Example: (using cache)
> // do a whole bunch of transformations on an RDD
>
> myrdd.cache()
>
> val result1 = myrdd.map(op1(_))
>
> val result2 = myrdd.map(op2(_))
>
> // in the above I am assuming that a call to cache will prevent all
> previous transformation from being calculated twice
>
> I'd like to somehow get result1 and result2 without duplicating work. How
> can I do that?
>
> thanks
>
> Jeff
>


How can i remove the need for calling cache

2017-08-01 Thread jeff saremi
Calling cache/persist fails all our jobs (i have  posted 2 threads on this).

And we're giving up hope in finding a solution.
So I'd like to find a workaround for that:

If I save an RDD to hdfs and read it back, can I use it in more than one 
operation?

Example: (using cache)
// do a whole bunch of transformations on an RDD

myrdd.cache()

val result1 = myrdd.map(op1(_))

val result2 = myrdd.map(op2(_))

// in the above I am assuming that a call to cache will prevent all previous 
transformation from being calculated twice


I'd like to somehow get result1 and result2 without duplicating work. How can I 
do that?

thanks

Jeff