Re: Is RDD thread safe?
Thank you Imran I will check whether there is memory waste or not Imran Rashid 于2019年11月26日周二 上午1:30写道: > I think Chang is right, but I also think this only comes up in limited > scenarios. I initially thought it wasn't a bug, but after some more > thought I have some concerns in light of the issues we've had w/ > nondeterministic RDDs, eg. repartition(). > > Say I have code like this: > > val cachedRDD = sc.textFile(...).cache() > (0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) } > > that is, my cached rdd is referenced by many threads concurrently before > the RDD has been cached. > > When one of those tasks gets to cachedRDD.getOrCompute(), there are a few > possible scenarios: > > 1) the partition has never been referenced before. > BlockManager.getOrCompute() will say the block doesn't exist, so it will > get recomputed ( > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360 > ) > > 2) The partition has been fully materialized by another task, the > blockmanagermaster on the driver already knows about it, so > BlockManager.getOrCompute() will return a pointer to the cached block > (perhaps on another node) > > 3) The partition is actively being computed by another task on the same > executor. Then BlockManager.getOrCompute() will not know about that other > version of the task (it only knows about blocks that are fully > materialized, IIUC). But eventually, when the tasks try to actually write > the data, they'll try to get a write lock for the block: > https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218 > one task will get the write lock first; the other task will block on the > other task, and then realize the block exists and just return those values. > > 4) The partition is actively being compute by another task on a > *different* executor. IIUC, Spark doesn't try to do anything to prevent > both tasks from computing the block themselves in this case. (To do so > would require extra coordination in driver before writing every single > block.) Those locks in BlockManager and BlockInfoManager don't stop this > case, because this is happening in entirely independent JVMs. > There normally won't be any problem here -- if the RDD is totally > deterministic, then you'll just end up with an extra copy of the data. In > a way, this is good, the cached RDD is in high demand, so having an extra > copy isn't so bad. > OTOH, if the RDD is non-deterministic, you've now got two copies with > different values. Then again, RDD cache is not resilient in general, so > you've always got to be able to handle an RDD getting recomputed if its > evicted from the cache. So this should be pretty similar. > > On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu > wrote: > >> emmm, I haven't check code, but I think if an RDD is referenced in >> several places, the correct behavior should be: when this RDD data is >> needed, it will be computed and then cached only once, otherwise it should >> be treated as a bug. If you are suspicious there's a race condition, you >> could create a jira ticket. >> >> On Mon, Nov 25, 2019 at 12:21 PM Chang Chen wrote: >> >>> Sorry I did't describe clearly, RDD id itself is thread-safe, how about >>> cached data? >>> >>> See codes from BlockManager >>> >>> def getOrElseUpdate(...) = { >>> get[T](blockId)(classTag) match { >>>case ... >>>case _ => // 1. no data is >>> cached. >>> // Need to compute the block >>> } >>> // Initially we hold no locks on this block >>> doPutIterator(...) match{..} >>> } >>> >>> Considering two DAGs (contain the same cached RDD ) runs >>> simultaneously, if both returns none when they get same block from >>> BlockManager(i.e. #1 above), then I guess the same data would be cached >>> twice. >>> >>> If the later cache could override the previous data, and no memory is >>> waste, then this is OK >>> >>> Thanks >>> Chang >>> >>> >>> Weichen Xu 于2019年11月25日周一 上午11:52写道: >>> Rdd id is immutable and when rdd object created, the rdd id is generated. So why there is race condition in "rdd id" ? On Mon, Nov 25, 2019 at 11:31 AM Chang Chen wrote: > I am wonder the concurrent semantics for reason about the correctness. > If the two query simultaneously run the DAGs which use the same cached > DF\RDD,but before cache data actually happen, what will happen? > > By looking into code a litter, I suspect they have different BlockID > for same Dataset which is unexpected behavior, but there is no race > condition. > > However RDD id is not lazy, so there is race condition. > > Thanks > Chang > > > Weichen Xu 于2019年11月12日周二 下午1:22写道: > >> Hi Chang, >> >> RDD/Dataframe is immutable and lazy computed. They are thread safe. >> >> Thanks! >> >>
Re: Is RDD thread safe?
Very well put Imran. This is a variant of executor failure after an RDD has been computed (including caching). In general, non determinism in spark is going to lead to inconsistency. The only reasonable solution for us, at that time, was to make pseudo-randomness repeatable and checkpoint after so that recomputation becomes deterministic. Regards, Mridul On Mon, Nov 25, 2019 at 9:30 AM Imran Rashid wrote: > I think Chang is right, but I also think this only comes up in limited > scenarios. I initially thought it wasn't a bug, but after some more > thought I have some concerns in light of the issues we've had w/ > nondeterministic RDDs, eg. repartition(). > > Say I have code like this: > > val cachedRDD = sc.textFile(...).cache() > (0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) } > > that is, my cached rdd is referenced by many threads concurrently before > the RDD has been cached. > > When one of those tasks gets to cachedRDD.getOrCompute(), there are a few > possible scenarios: > > 1) the partition has never been referenced before. > BlockManager.getOrCompute() will say the block doesn't exist, so it will > get recomputed ( > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360 > ) > > 2) The partition has been fully materialized by another task, the > blockmanagermaster on the driver already knows about it, so > BlockManager.getOrCompute() will return a pointer to the cached block > (perhaps on another node) > > 3) The partition is actively being computed by another task on the same > executor. Then BlockManager.getOrCompute() will not know about that other > version of the task (it only knows about blocks that are fully > materialized, IIUC). But eventually, when the tasks try to actually write > the data, they'll try to get a write lock for the block: > https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218 > one task will get the write lock first; the other task will block on the > other task, and then realize the block exists and just return those values. > > 4) The partition is actively being compute by another task on a > *different* executor. IIUC, Spark doesn't try to do anything to prevent > both tasks from computing the block themselves in this case. (To do so > would require extra coordination in driver before writing every single > block.) Those locks in BlockManager and BlockInfoManager don't stop this > case, because this is happening in entirely independent JVMs. > There normally won't be any problem here -- if the RDD is totally > deterministic, then you'll just end up with an extra copy of the data. In > a way, this is good, the cached RDD is in high demand, so having an extra > copy isn't so bad. > OTOH, if the RDD is non-deterministic, you've now got two copies with > different values. Then again, RDD cache is not resilient in general, so > you've always got to be able to handle an RDD getting recomputed if its > evicted from the cache. So this should be pretty similar. > > On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu > wrote: > >> emmm, I haven't check code, but I think if an RDD is referenced in >> several places, the correct behavior should be: when this RDD data is >> needed, it will be computed and then cached only once, otherwise it should >> be treated as a bug. If you are suspicious there's a race condition, you >> could create a jira ticket. >> >> On Mon, Nov 25, 2019 at 12:21 PM Chang Chen wrote: >> >>> Sorry I did't describe clearly, RDD id itself is thread-safe, how about >>> cached data? >>> >>> See codes from BlockManager >>> >>> def getOrElseUpdate(...) = { >>> get[T](blockId)(classTag) match { >>>case ... >>>case _ => // 1. no data is >>> cached. >>> // Need to compute the block >>> } >>> // Initially we hold no locks on this block >>> doPutIterator(...) match{..} >>> } >>> >>> Considering two DAGs (contain the same cached RDD ) runs >>> simultaneously, if both returns none when they get same block from >>> BlockManager(i.e. #1 above), then I guess the same data would be cached >>> twice. >>> >>> If the later cache could override the previous data, and no memory is >>> waste, then this is OK >>> >>> Thanks >>> Chang >>> >>> >>> Weichen Xu 于2019年11月25日周一 上午11:52写道: >>> Rdd id is immutable and when rdd object created, the rdd id is generated. So why there is race condition in "rdd id" ? On Mon, Nov 25, 2019 at 11:31 AM Chang Chen wrote: > I am wonder the concurrent semantics for reason about the correctness. > If the two query simultaneously run the DAGs which use the same cached > DF\RDD,but before cache data actually happen, what will happen? > > By looking into code a litter, I suspect they have different BlockID > for same Dataset which is unexpected behavior, but there is no race >
Re: Is RDD thread safe?
I think Chang is right, but I also think this only comes up in limited scenarios. I initially thought it wasn't a bug, but after some more thought I have some concerns in light of the issues we've had w/ nondeterministic RDDs, eg. repartition(). Say I have code like this: val cachedRDD = sc.textFile(...).cache() (0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) } that is, my cached rdd is referenced by many threads concurrently before the RDD has been cached. When one of those tasks gets to cachedRDD.getOrCompute(), there are a few possible scenarios: 1) the partition has never been referenced before. BlockManager.getOrCompute() will say the block doesn't exist, so it will get recomputed ( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360 ) 2) The partition has been fully materialized by another task, the blockmanagermaster on the driver already knows about it, so BlockManager.getOrCompute() will return a pointer to the cached block (perhaps on another node) 3) The partition is actively being computed by another task on the same executor. Then BlockManager.getOrCompute() will not know about that other version of the task (it only knows about blocks that are fully materialized, IIUC). But eventually, when the tasks try to actually write the data, they'll try to get a write lock for the block: https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218 one task will get the write lock first; the other task will block on the other task, and then realize the block exists and just return those values. 4) The partition is actively being compute by another task on a *different* executor. IIUC, Spark doesn't try to do anything to prevent both tasks from computing the block themselves in this case. (To do so would require extra coordination in driver before writing every single block.) Those locks in BlockManager and BlockInfoManager don't stop this case, because this is happening in entirely independent JVMs. There normally won't be any problem here -- if the RDD is totally deterministic, then you'll just end up with an extra copy of the data. In a way, this is good, the cached RDD is in high demand, so having an extra copy isn't so bad. OTOH, if the RDD is non-deterministic, you've now got two copies with different values. Then again, RDD cache is not resilient in general, so you've always got to be able to handle an RDD getting recomputed if its evicted from the cache. So this should be pretty similar. On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu wrote: > emmm, I haven't check code, but I think if an RDD is referenced in several > places, the correct behavior should be: when this RDD data is needed, it > will be computed and then cached only once, otherwise it should be treated > as a bug. If you are suspicious there's a race condition, you could create > a jira ticket. > > On Mon, Nov 25, 2019 at 12:21 PM Chang Chen wrote: > >> Sorry I did't describe clearly, RDD id itself is thread-safe, how about >> cached data? >> >> See codes from BlockManager >> >> def getOrElseUpdate(...) = { >> get[T](blockId)(classTag) match { >>case ... >>case _ => // 1. no data is cached. >> // Need to compute the block >> } >> // Initially we hold no locks on this block >> doPutIterator(...) match{..} >> } >> >> Considering two DAGs (contain the same cached RDD ) runs >> simultaneously, if both returns none when they get same block from >> BlockManager(i.e. #1 above), then I guess the same data would be cached >> twice. >> >> If the later cache could override the previous data, and no memory is >> waste, then this is OK >> >> Thanks >> Chang >> >> >> Weichen Xu 于2019年11月25日周一 上午11:52写道: >> >>> Rdd id is immutable and when rdd object created, the rdd id is >>> generated. So why there is race condition in "rdd id" ? >>> >>> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen >>> wrote: >>> I am wonder the concurrent semantics for reason about the correctness. If the two query simultaneously run the DAGs which use the same cached DF\RDD,but before cache data actually happen, what will happen? By looking into code a litter, I suspect they have different BlockID for same Dataset which is unexpected behavior, but there is no race condition. However RDD id is not lazy, so there is race condition. Thanks Chang Weichen Xu 于2019年11月12日周二 下午1:22写道: > Hi Chang, > > RDD/Dataframe is immutable and lazy computed. They are thread safe. > > Thanks! > > On Tue, Nov 12, 2019 at 12:31 PM Chang Chen > wrote: > >> Hi all >> >> I meet a case where I need cache a source RDD, and then create >> different DataFrame from it in different threads to accelerate query. >> >> I know that SparkSession is th
Re: Is RDD thread safe?
emmm, I haven't check code, but I think if an RDD is referenced in several places, the correct behavior should be: when this RDD data is needed, it will be computed and then cached only once, otherwise it should be treated as a bug. If you are suspicious there's a race condition, you could create a jira ticket. On Mon, Nov 25, 2019 at 12:21 PM Chang Chen wrote: > Sorry I did't describe clearly, RDD id itself is thread-safe, how about > cached data? > > See codes from BlockManager > > def getOrElseUpdate(...) = { > get[T](blockId)(classTag) match { >case ... >case _ => // 1. no data is cached. > // Need to compute the block > } > // Initially we hold no locks on this block > doPutIterator(...) match{..} > } > > Considering two DAGs (contain the same cached RDD ) runs simultaneously, > if both returns none when they get same block from BlockManager(i.e. #1 > above), then I guess the same data would be cached twice. > > If the later cache could override the previous data, and no memory is > waste, then this is OK > > Thanks > Chang > > > Weichen Xu 于2019年11月25日周一 上午11:52写道: > >> Rdd id is immutable and when rdd object created, the rdd id is generated. >> So why there is race condition in "rdd id" ? >> >> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen wrote: >> >>> I am wonder the concurrent semantics for reason about the correctness. >>> If the two query simultaneously run the DAGs which use the same cached >>> DF\RDD,but before cache data actually happen, what will happen? >>> >>> By looking into code a litter, I suspect they have different BlockID for >>> same Dataset which is unexpected behavior, but there is no race condition. >>> >>> However RDD id is not lazy, so there is race condition. >>> >>> Thanks >>> Chang >>> >>> >>> Weichen Xu 于2019年11月12日周二 下午1:22写道: >>> Hi Chang, RDD/Dataframe is immutable and lazy computed. They are thread safe. Thanks! On Tue, Nov 12, 2019 at 12:31 PM Chang Chen wrote: > Hi all > > I meet a case where I need cache a source RDD, and then create > different DataFrame from it in different threads to accelerate query. > > I know that SparkSession is thread safe( > https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure > whether RDD is thread safe or not > > Thanks >
Re: Is RDD thread safe?
Sorry I did't describe clearly, RDD id itself is thread-safe, how about cached data? See codes from BlockManager def getOrElseUpdate(...) = { get[T](blockId)(classTag) match { case ... case _ => // 1. no data is cached. // Need to compute the block } // Initially we hold no locks on this block doPutIterator(...) match{..} } Considering two DAGs (contain the same cached RDD ) runs simultaneously, if both returns none when they get same block from BlockManager(i.e. #1 above), then I guess the same data would be cached twice. If the later cache could override the previous data, and no memory is waste, then this is OK Thanks Chang Weichen Xu 于2019年11月25日周一 上午11:52写道: > Rdd id is immutable and when rdd object created, the rdd id is generated. > So why there is race condition in "rdd id" ? > > On Mon, Nov 25, 2019 at 11:31 AM Chang Chen wrote: > >> I am wonder the concurrent semantics for reason about the correctness. If >> the two query simultaneously run the DAGs which use the same cached >> DF\RDD,but before cache data actually happen, what will happen? >> >> By looking into code a litter, I suspect they have different BlockID for >> same Dataset which is unexpected behavior, but there is no race condition. >> >> However RDD id is not lazy, so there is race condition. >> >> Thanks >> Chang >> >> >> Weichen Xu 于2019年11月12日周二 下午1:22写道: >> >>> Hi Chang, >>> >>> RDD/Dataframe is immutable and lazy computed. They are thread safe. >>> >>> Thanks! >>> >>> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen >>> wrote: >>> Hi all I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query. I know that SparkSession is thread safe( https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD is thread safe or not Thanks >>>
Re: Is RDD thread safe?
Rdd id is immutable and when rdd object created, the rdd id is generated. So why there is race condition in "rdd id" ? On Mon, Nov 25, 2019 at 11:31 AM Chang Chen wrote: > I am wonder the concurrent semantics for reason about the correctness. If > the two query simultaneously run the DAGs which use the same cached > DF\RDD,but before cache data actually happen, what will happen? > > By looking into code a litter, I suspect they have different BlockID for > same Dataset which is unexpected behavior, but there is no race condition. > > However RDD id is not lazy, so there is race condition. > > Thanks > Chang > > > Weichen Xu 于2019年11月12日周二 下午1:22写道: > >> Hi Chang, >> >> RDD/Dataframe is immutable and lazy computed. They are thread safe. >> >> Thanks! >> >> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen wrote: >> >>> Hi all >>> >>> I meet a case where I need cache a source RDD, and then create different >>> DataFrame from it in different threads to accelerate query. >>> >>> I know that SparkSession is thread safe( >>> https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure >>> whether RDD is thread safe or not >>> >>> Thanks >>> >>
Re: Is RDD thread safe?
I am wonder the concurrent semantics for reason about the correctness. If the two query simultaneously run the DAGs which use the same cached DF\RDD,but before cache data actually happen, what will happen? By looking into code a litter, I suspect they have different BlockID for same Dataset which is unexpected behavior, but there is no race condition. However RDD id is not lazy, so there is race condition. Thanks Chang Weichen Xu 于2019年11月12日周二 下午1:22写道: > Hi Chang, > > RDD/Dataframe is immutable and lazy computed. They are thread safe. > > Thanks! > > On Tue, Nov 12, 2019 at 12:31 PM Chang Chen wrote: > >> Hi all >> >> I meet a case where I need cache a source RDD, and then create different >> DataFrame from it in different threads to accelerate query. >> >> I know that SparkSession is thread safe( >> https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure >> whether RDD is thread safe or not >> >> Thanks >> >
Re: Is RDD thread safe?
Hi Chang, RDD/Dataframe is immutable and lazy computed. They are thread safe. Thanks! On Tue, Nov 12, 2019 at 12:31 PM Chang Chen wrote: > Hi all > > I meet a case where I need cache a source RDD, and then create different > DataFrame from it in different threads to accelerate query. > > I know that SparkSession is thread safe( > https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure > whether RDD is thread safe or not > > Thanks >
Is RDD thread safe?
Hi all I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query. I know that SparkSession is thread safe( https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD is thread safe or not Thanks