Re: Is RDD thread safe?

2019-11-25 Thread Chang Chen
Thank you Imran

I will check whether there is memory waste or not

Imran Rashid  于2019年11月26日周二 上午1:30写道:

> I think Chang is right, but I also think this only comes up in limited
> scenarios.  I initially thought it wasn't a bug, but after some more
> thought I have some concerns in light of the issues we've had w/
> nondeterministic RDDs, eg. repartition().
>
> Say I have code like this:
>
> val cachedRDD = sc.textFile(...).cache()
> (0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) }
>
> that is, my cached rdd is referenced by many threads concurrently before
> the RDD has been cached.
>
> When one of those tasks gets to cachedRDD.getOrCompute(), there are a few
> possible scenarios:
>
> 1) the partition has never been referenced before.
> BlockManager.getOrCompute() will say the block doesn't exist, so it will
> get recomputed (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360
> )
>
> 2) The partition has been fully materialized by another task, the
> blockmanagermaster on the driver already knows about it, so
> BlockManager.getOrCompute() will return a pointer to the cached block
> (perhaps on another node)
>
> 3) The partition is actively being computed by another task on the same
> executor.  Then BlockManager.getOrCompute() will not know about that other
> version of the task (it only knows about blocks that are fully
> materialized, IIUC).  But eventually, when the tasks try to actually write
> the data, they'll try to get a write lock for the block:
> https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218
> one task will get the write lock first; the other task will block on the
> other task, and then realize the block exists and just return those values.
>
> 4) The partition is actively being compute by another task on a
> *different* executor.  IIUC, Spark doesn't try to do anything to prevent
> both tasks from computing the block themselves in this case.  (To do so
> would require extra coordination in driver before writing every single
> block.)  Those locks in BlockManager and BlockInfoManager don't stop this
> case, because this is happening in entirely independent JVMs.
> There normally won't be any problem here -- if the RDD is totally
> deterministic, then you'll just end up with an extra copy of the data.  In
> a way, this is good, the cached RDD is in high demand, so having an extra
> copy isn't so bad.
> OTOH, if the RDD is non-deterministic, you've now got two copies with
> different values.  Then again, RDD cache is not resilient in general, so
> you've always got to be able to handle an RDD getting recomputed if its
> evicted from the cache.  So this should be pretty similar.
>
> On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu 
> wrote:
>
>> emmm, I haven't check code, but I think if an RDD is referenced in
>> several places, the correct behavior should be: when this RDD data is
>> needed, it will be computed and then cached only once, otherwise it should
>> be treated as a bug. If you are suspicious there's a race condition, you
>> could create a jira ticket.
>>
>> On Mon, Nov 25, 2019 at 12:21 PM Chang Chen  wrote:
>>
>>> Sorry I did't describe clearly,  RDD id itself is thread-safe, how about
>>> cached data?
>>>
>>> See codes from BlockManager
>>>
>>> def getOrElseUpdate(...)   = {
>>>   get[T](blockId)(classTag) match {
>>>case ...
>>>case _ =>  // 1. no data is
>>> cached.
>>> // Need to compute the block
>>>  }
>>>  // Initially we hold no locks on this block
>>>  doPutIterator(...) match{..}
>>> }
>>>
>>> Considering  two DAGs (contain the same cached RDD ) runs
>>> simultaneously,  if both returns none  when they get same block from
>>> BlockManager(i.e. #1 above), then I guess the same data would be cached
>>> twice.
>>>
>>> If the later cache could override the previous data, and no memory is
>>> waste, then this is OK
>>>
>>> Thanks
>>> Chang
>>>
>>>
>>> Weichen Xu  于2019年11月25日周一 上午11:52写道:
>>>
 Rdd id is immutable and when rdd object created, the rdd id is
 generated. So why there is race condition in "rdd id" ?

 On Mon, Nov 25, 2019 at 11:31 AM Chang Chen 
 wrote:

> I am wonder the concurrent semantics for reason about the correctness.
> If the two query simultaneously run the DAGs which use the same cached
> DF\RDD,but before cache data actually happen, what will happen?
>
> By looking into code a litter, I suspect they have different BlockID
> for same Dataset which is unexpected behavior, but there is no race
> condition.
>
> However RDD id is not lazy, so there is race condition.
>
> Thanks
> Chang
>
>
> Weichen Xu  于2019年11月12日周二 下午1:22写道:
>
>> Hi Chang,
>>
>> RDD/Dataframe is immutable and lazy computed. They are thread safe.
>>
>> Thanks!
>>
>> 

Re: Is RDD thread safe?

2019-11-25 Thread Mridul Muralidharan
Very well put Imran. This is a variant of executor failure after an RDD has
been computed (including caching). In general, non determinism in spark is
going to lead to inconsistency.
The only reasonable solution for us, at that time, was to make
pseudo-randomness repeatable and checkpoint after so that recomputation
becomes deterministic.


Regards,
Mridul

On Mon, Nov 25, 2019 at 9:30 AM Imran Rashid 
wrote:

> I think Chang is right, but I also think this only comes up in limited
> scenarios.  I initially thought it wasn't a bug, but after some more
> thought I have some concerns in light of the issues we've had w/
> nondeterministic RDDs, eg. repartition().
>
> Say I have code like this:
>
> val cachedRDD = sc.textFile(...).cache()
> (0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) }
>
> that is, my cached rdd is referenced by many threads concurrently before
> the RDD has been cached.
>
> When one of those tasks gets to cachedRDD.getOrCompute(), there are a few
> possible scenarios:
>
> 1) the partition has never been referenced before.
> BlockManager.getOrCompute() will say the block doesn't exist, so it will
> get recomputed (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360
> )
>
> 2) The partition has been fully materialized by another task, the
> blockmanagermaster on the driver already knows about it, so
> BlockManager.getOrCompute() will return a pointer to the cached block
> (perhaps on another node)
>
> 3) The partition is actively being computed by another task on the same
> executor.  Then BlockManager.getOrCompute() will not know about that other
> version of the task (it only knows about blocks that are fully
> materialized, IIUC).  But eventually, when the tasks try to actually write
> the data, they'll try to get a write lock for the block:
> https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218
> one task will get the write lock first; the other task will block on the
> other task, and then realize the block exists and just return those values.
>
> 4) The partition is actively being compute by another task on a
> *different* executor.  IIUC, Spark doesn't try to do anything to prevent
> both tasks from computing the block themselves in this case.  (To do so
> would require extra coordination in driver before writing every single
> block.)  Those locks in BlockManager and BlockInfoManager don't stop this
> case, because this is happening in entirely independent JVMs.
> There normally won't be any problem here -- if the RDD is totally
> deterministic, then you'll just end up with an extra copy of the data.  In
> a way, this is good, the cached RDD is in high demand, so having an extra
> copy isn't so bad.
> OTOH, if the RDD is non-deterministic, you've now got two copies with
> different values.  Then again, RDD cache is not resilient in general, so
> you've always got to be able to handle an RDD getting recomputed if its
> evicted from the cache.  So this should be pretty similar.
>
> On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu 
> wrote:
>
>> emmm, I haven't check code, but I think if an RDD is referenced in
>> several places, the correct behavior should be: when this RDD data is
>> needed, it will be computed and then cached only once, otherwise it should
>> be treated as a bug. If you are suspicious there's a race condition, you
>> could create a jira ticket.
>>
>> On Mon, Nov 25, 2019 at 12:21 PM Chang Chen  wrote:
>>
>>> Sorry I did't describe clearly,  RDD id itself is thread-safe, how about
>>> cached data?
>>>
>>> See codes from BlockManager
>>>
>>> def getOrElseUpdate(...)   = {
>>>   get[T](blockId)(classTag) match {
>>>case ...
>>>case _ =>  // 1. no data is
>>> cached.
>>> // Need to compute the block
>>>  }
>>>  // Initially we hold no locks on this block
>>>  doPutIterator(...) match{..}
>>> }
>>>
>>> Considering  two DAGs (contain the same cached RDD ) runs
>>> simultaneously,  if both returns none  when they get same block from
>>> BlockManager(i.e. #1 above), then I guess the same data would be cached
>>> twice.
>>>
>>> If the later cache could override the previous data, and no memory is
>>> waste, then this is OK
>>>
>>> Thanks
>>> Chang
>>>
>>>
>>> Weichen Xu  于2019年11月25日周一 上午11:52写道:
>>>
 Rdd id is immutable and when rdd object created, the rdd id is
 generated. So why there is race condition in "rdd id" ?

 On Mon, Nov 25, 2019 at 11:31 AM Chang Chen 
 wrote:

> I am wonder the concurrent semantics for reason about the correctness.
> If the two query simultaneously run the DAGs which use the same cached
> DF\RDD,but before cache data actually happen, what will happen?
>
> By looking into code a litter, I suspect they have different BlockID
> for same Dataset which is unexpected behavior, but there is no race
> 

Re: Is RDD thread safe?

2019-11-25 Thread Imran Rashid
I think Chang is right, but I also think this only comes up in limited
scenarios.  I initially thought it wasn't a bug, but after some more
thought I have some concerns in light of the issues we've had w/
nondeterministic RDDs, eg. repartition().

Say I have code like this:

val cachedRDD = sc.textFile(...).cache()
(0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) }

that is, my cached rdd is referenced by many threads concurrently before
the RDD has been cached.

When one of those tasks gets to cachedRDD.getOrCompute(), there are a few
possible scenarios:

1) the partition has never been referenced before.
BlockManager.getOrCompute() will say the block doesn't exist, so it will
get recomputed (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360
)

2) The partition has been fully materialized by another task, the
blockmanagermaster on the driver already knows about it, so
BlockManager.getOrCompute() will return a pointer to the cached block
(perhaps on another node)

3) The partition is actively being computed by another task on the same
executor.  Then BlockManager.getOrCompute() will not know about that other
version of the task (it only knows about blocks that are fully
materialized, IIUC).  But eventually, when the tasks try to actually write
the data, they'll try to get a write lock for the block:
https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218
one task will get the write lock first; the other task will block on the
other task, and then realize the block exists and just return those values.

4) The partition is actively being compute by another task on a *different*
executor.  IIUC, Spark doesn't try to do anything to prevent both tasks
from computing the block themselves in this case.  (To do so would require
extra coordination in driver before writing every single block.)  Those
locks in BlockManager and BlockInfoManager don't stop this case, because
this is happening in entirely independent JVMs.
There normally won't be any problem here -- if the RDD is totally
deterministic, then you'll just end up with an extra copy of the data.  In
a way, this is good, the cached RDD is in high demand, so having an extra
copy isn't so bad.
OTOH, if the RDD is non-deterministic, you've now got two copies with
different values.  Then again, RDD cache is not resilient in general, so
you've always got to be able to handle an RDD getting recomputed if its
evicted from the cache.  So this should be pretty similar.

On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu 
wrote:

> emmm, I haven't check code, but I think if an RDD is referenced in several
> places, the correct behavior should be: when this RDD data is needed, it
> will be computed and then cached only once, otherwise it should be treated
> as a bug. If you are suspicious there's a race condition, you could create
> a jira ticket.
>
> On Mon, Nov 25, 2019 at 12:21 PM Chang Chen  wrote:
>
>> Sorry I did't describe clearly,  RDD id itself is thread-safe, how about
>> cached data?
>>
>> See codes from BlockManager
>>
>> def getOrElseUpdate(...)   = {
>>   get[T](blockId)(classTag) match {
>>case ...
>>case _ =>  // 1. no data is cached.
>> // Need to compute the block
>>  }
>>  // Initially we hold no locks on this block
>>  doPutIterator(...) match{..}
>> }
>>
>> Considering  two DAGs (contain the same cached RDD ) runs
>> simultaneously,  if both returns none  when they get same block from
>> BlockManager(i.e. #1 above), then I guess the same data would be cached
>> twice.
>>
>> If the later cache could override the previous data, and no memory is
>> waste, then this is OK
>>
>> Thanks
>> Chang
>>
>>
>> Weichen Xu  于2019年11月25日周一 上午11:52写道:
>>
>>> Rdd id is immutable and when rdd object created, the rdd id is
>>> generated. So why there is race condition in "rdd id" ?
>>>
>>> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen 
>>> wrote:
>>>
 I am wonder the concurrent semantics for reason about the correctness.
 If the two query simultaneously run the DAGs which use the same cached
 DF\RDD,but before cache data actually happen, what will happen?

 By looking into code a litter, I suspect they have different BlockID
 for same Dataset which is unexpected behavior, but there is no race
 condition.

 However RDD id is not lazy, so there is race condition.

 Thanks
 Chang


 Weichen Xu  于2019年11月12日周二 下午1:22写道:

> Hi Chang,
>
> RDD/Dataframe is immutable and lazy computed. They are thread safe.
>
> Thanks!
>
> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen 
> wrote:
>
>> Hi all
>>
>> I meet a case where I need cache a source RDD, and then create
>> different DataFrame from it in different threads to accelerate query.
>>
>> I know that SparkSession is th

Re: Is RDD thread safe?

2019-11-25 Thread Weichen Xu
emmm, I haven't check code, but I think if an RDD is referenced in several
places, the correct behavior should be: when this RDD data is needed, it
will be computed and then cached only once, otherwise it should be treated
as a bug. If you are suspicious there's a race condition, you could create
a jira ticket.

On Mon, Nov 25, 2019 at 12:21 PM Chang Chen  wrote:

> Sorry I did't describe clearly,  RDD id itself is thread-safe, how about
> cached data?
>
> See codes from BlockManager
>
> def getOrElseUpdate(...)   = {
>   get[T](blockId)(classTag) match {
>case ...
>case _ =>  // 1. no data is cached.
> // Need to compute the block
>  }
>  // Initially we hold no locks on this block
>  doPutIterator(...) match{..}
> }
>
> Considering  two DAGs (contain the same cached RDD ) runs simultaneously,
> if both returns none  when they get same block from BlockManager(i.e. #1
> above), then I guess the same data would be cached twice.
>
> If the later cache could override the previous data, and no memory is
> waste, then this is OK
>
> Thanks
> Chang
>
>
> Weichen Xu  于2019年11月25日周一 上午11:52写道:
>
>> Rdd id is immutable and when rdd object created, the rdd id is generated.
>> So why there is race condition in "rdd id" ?
>>
>> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen  wrote:
>>
>>> I am wonder the concurrent semantics for reason about the correctness.
>>> If the two query simultaneously run the DAGs which use the same cached
>>> DF\RDD,but before cache data actually happen, what will happen?
>>>
>>> By looking into code a litter, I suspect they have different BlockID for
>>> same Dataset which is unexpected behavior, but there is no race condition.
>>>
>>> However RDD id is not lazy, so there is race condition.
>>>
>>> Thanks
>>> Chang
>>>
>>>
>>> Weichen Xu  于2019年11月12日周二 下午1:22写道:
>>>
 Hi Chang,

 RDD/Dataframe is immutable and lazy computed. They are thread safe.

 Thanks!

 On Tue, Nov 12, 2019 at 12:31 PM Chang Chen 
 wrote:

> Hi all
>
> I meet a case where I need cache a source RDD, and then create
> different DataFrame from it in different threads to accelerate query.
>
> I know that SparkSession is thread safe(
> https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure
> whether RDD  is thread safe or not
>
> Thanks
>



Re: Is RDD thread safe?

2019-11-24 Thread Chang Chen
Sorry I did't describe clearly,  RDD id itself is thread-safe, how about
cached data?

See codes from BlockManager

def getOrElseUpdate(...)   = {
  get[T](blockId)(classTag) match {
   case ...
   case _ =>  // 1. no data is cached.
// Need to compute the block
 }
 // Initially we hold no locks on this block
 doPutIterator(...) match{..}
}

Considering  two DAGs (contain the same cached RDD ) runs simultaneously,
if both returns none  when they get same block from BlockManager(i.e. #1
above), then I guess the same data would be cached twice.

If the later cache could override the previous data, and no memory is
waste, then this is OK

Thanks
Chang


Weichen Xu  于2019年11月25日周一 上午11:52写道:

> Rdd id is immutable and when rdd object created, the rdd id is generated.
> So why there is race condition in "rdd id" ?
>
> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen  wrote:
>
>> I am wonder the concurrent semantics for reason about the correctness. If
>> the two query simultaneously run the DAGs which use the same cached
>> DF\RDD,but before cache data actually happen, what will happen?
>>
>> By looking into code a litter, I suspect they have different BlockID for
>> same Dataset which is unexpected behavior, but there is no race condition.
>>
>> However RDD id is not lazy, so there is race condition.
>>
>> Thanks
>> Chang
>>
>>
>> Weichen Xu  于2019年11月12日周二 下午1:22写道:
>>
>>> Hi Chang,
>>>
>>> RDD/Dataframe is immutable and lazy computed. They are thread safe.
>>>
>>> Thanks!
>>>
>>> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen 
>>> wrote:
>>>
 Hi all

 I meet a case where I need cache a source RDD, and then create
 different DataFrame from it in different threads to accelerate query.

 I know that SparkSession is thread safe(
 https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure
 whether RDD  is thread safe or not

 Thanks

>>>


Re: Is RDD thread safe?

2019-11-24 Thread Weichen Xu
Rdd id is immutable and when rdd object created, the rdd id is generated.
So why there is race condition in "rdd id" ?

On Mon, Nov 25, 2019 at 11:31 AM Chang Chen  wrote:

> I am wonder the concurrent semantics for reason about the correctness. If
> the two query simultaneously run the DAGs which use the same cached
> DF\RDD,but before cache data actually happen, what will happen?
>
> By looking into code a litter, I suspect they have different BlockID for
> same Dataset which is unexpected behavior, but there is no race condition.
>
> However RDD id is not lazy, so there is race condition.
>
> Thanks
> Chang
>
>
> Weichen Xu  于2019年11月12日周二 下午1:22写道:
>
>> Hi Chang,
>>
>> RDD/Dataframe is immutable and lazy computed. They are thread safe.
>>
>> Thanks!
>>
>> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen  wrote:
>>
>>> Hi all
>>>
>>> I meet a case where I need cache a source RDD, and then create different
>>> DataFrame from it in different threads to accelerate query.
>>>
>>> I know that SparkSession is thread safe(
>>> https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure
>>> whether RDD  is thread safe or not
>>>
>>> Thanks
>>>
>>


Re: Is RDD thread safe?

2019-11-24 Thread Chang Chen
I am wonder the concurrent semantics for reason about the correctness. If
the two query simultaneously run the DAGs which use the same cached
DF\RDD,but before cache data actually happen, what will happen?

By looking into code a litter, I suspect they have different BlockID for
same Dataset which is unexpected behavior, but there is no race condition.

However RDD id is not lazy, so there is race condition.

Thanks
Chang


Weichen Xu  于2019年11月12日周二 下午1:22写道:

> Hi Chang,
>
> RDD/Dataframe is immutable and lazy computed. They are thread safe.
>
> Thanks!
>
> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen  wrote:
>
>> Hi all
>>
>> I meet a case where I need cache a source RDD, and then create different
>> DataFrame from it in different threads to accelerate query.
>>
>> I know that SparkSession is thread safe(
>> https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure
>> whether RDD  is thread safe or not
>>
>> Thanks
>>
>


Re: Is RDD thread safe?

2019-11-11 Thread Weichen Xu
Hi Chang,

RDD/Dataframe is immutable and lazy computed. They are thread safe.

Thanks!

On Tue, Nov 12, 2019 at 12:31 PM Chang Chen  wrote:

> Hi all
>
> I meet a case where I need cache a source RDD, and then create different
> DataFrame from it in different threads to accelerate query.
>
> I know that SparkSession is thread safe(
> https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure
> whether RDD  is thread safe or not
>
> Thanks
>


Is RDD thread safe?

2019-11-11 Thread Chang Chen
Hi all

I meet a case where I need cache a source RDD, and then create different
DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(
https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure
whether RDD  is thread safe or not

Thanks