[jira] [Assigned] (SPARK-43987) Separate finalizeShuffleMerge Processing to Dedicated Thread Pools

2023-08-11 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-43987:
---

Assignee: SHU WANG

> Separate finalizeShuffleMerge Processing to Dedicated Thread Pools
> --
>
> Key: SPARK-43987
> URL: https://issues.apache.org/jira/browse/SPARK-43987
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.2.0, 3.4.0
>Reporter: SHU WANG
>Assignee: SHU WANG
>Priority: Critical
>
> In our production environment, _finalizeShuffleMerge_ processing took longer 
> time (p90 is around 20s) than other PRC requests. This is due to 
> _finalizeShuffleMerge_ invoking IO operations like truncate and file 
> open/close.  
> More importantly, processing this _finalizeShuffleMerge_ can block other 
> critical lightweight messages like authentications, which can cause 
> authentication timeout as well as fetch failures. Those timeout and fetch 
> failures affect the stability of the Spark job executions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44272) Path Inconsistency when Operating statCache within Yarn Client

2023-07-19 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-44272:

Affects Version/s: (was: 0.9.1)
   (was: 2.3.0)

> Path Inconsistency when Operating statCache within Yarn Client
> --
>
> Key: SPARK-44272
> URL: https://issues.apache.org/jira/browse/SPARK-44272
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit
>Affects Versions: 3.4.0, 3.5.0
>Reporter: SHU WANG
>Assignee: SHU WANG
>Priority: Critical
> Fix For: 3.5.0, 4.0.0
>
>
> The *addResource* from *ClientDistributedCacheManager* can add *FileStatus* 
> to 
> *statCache* when it is not yet cached. Also, there is a subtle bug from 
> *isPublic* from 
> *getVisibility* method. *uri.getPath()* will not retain URI information like 
> scheme, host, etc. So, the *uri* passed to checkPermissionOfOther will differ 
> from the original {*}uri{*}.
> For example, if uri is "file:/foo.invalid.com:8080/tmp/testing", then 
> {code:java}
> uri.getPath -> /foo.invalid.com:8080/tmp/testing
> uri.toString -> file:/foo.invalid.com:8080/tmp/testing{code}
> The consequence of this bug is that we will *double RPC calls* when the 
> resources are remote, which is unnecessary. We see nontrivial overhead when 
> checking those resources from our HDFS, especially when HDFS is overloaded. 
>  
> Ref: related code within *ClientDistributedCacheManager*
> {code:java}
> def addResource(...) {
>     val destStatus = statCache.getOrElse(destPath.toUri(), 
> fs.getFileStatus(destPath))
> val visibility = getVisibility(conf, destPath.toUri(), statCache)
> }
> private[yarn] def getVisibility() {
> isPublic(conf, uri, statCache)
> }
> private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, 
> FileStatus]): Boolean = {
> val current = new Path(uri.getPath()) // Should not use getPath
> checkPermissionOfOther(fs, uri, FsAction.READ, statCache)
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44272) Path Inconsistency when Operating statCache within Yarn Client

2023-07-19 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-44272.
-
Fix Version/s: 3.5.0
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 41821
[https://github.com/apache/spark/pull/41821]

> Path Inconsistency when Operating statCache within Yarn Client
> --
>
> Key: SPARK-44272
> URL: https://issues.apache.org/jira/browse/SPARK-44272
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit
>Affects Versions: 0.9.1, 2.3.0, 3.4.0, 3.5.0
>Reporter: SHU WANG
>Assignee: SHU WANG
>Priority: Critical
> Fix For: 3.5.0, 4.0.0
>
>
> The *addResource* from *ClientDistributedCacheManager* can add *FileStatus* 
> to 
> *statCache* when it is not yet cached. Also, there is a subtle bug from 
> *isPublic* from 
> *getVisibility* method. *uri.getPath()* will not retain URI information like 
> scheme, host, etc. So, the *uri* passed to checkPermissionOfOther will differ 
> from the original {*}uri{*}.
> For example, if uri is "file:/foo.invalid.com:8080/tmp/testing", then 
> {code:java}
> uri.getPath -> /foo.invalid.com:8080/tmp/testing
> uri.toString -> file:/foo.invalid.com:8080/tmp/testing{code}
> The consequence of this bug is that we will *double RPC calls* when the 
> resources are remote, which is unnecessary. We see nontrivial overhead when 
> checking those resources from our HDFS, especially when HDFS is overloaded. 
>  
> Ref: related code within *ClientDistributedCacheManager*
> {code:java}
> def addResource(...) {
>     val destStatus = statCache.getOrElse(destPath.toUri(), 
> fs.getFileStatus(destPath))
> val visibility = getVisibility(conf, destPath.toUri(), statCache)
> }
> private[yarn] def getVisibility() {
> isPublic(conf, uri, statCache)
> }
> private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, 
> FileStatus]): Boolean = {
> val current = new Path(uri.getPath()) // Should not use getPath
> checkPermissionOfOther(fs, uri, FsAction.READ, statCache)
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44272) Path Inconsistency when Operating statCache within Yarn Client

2023-07-19 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-44272:
---

Assignee: SHU WANG

> Path Inconsistency when Operating statCache within Yarn Client
> --
>
> Key: SPARK-44272
> URL: https://issues.apache.org/jira/browse/SPARK-44272
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit
>Affects Versions: 0.9.1, 2.3.0, 3.4.0, 3.5.0
>Reporter: SHU WANG
>Assignee: SHU WANG
>Priority: Critical
>
> The *addResource* from *ClientDistributedCacheManager* can add *FileStatus* 
> to 
> *statCache* when it is not yet cached. Also, there is a subtle bug from 
> *isPublic* from 
> *getVisibility* method. *uri.getPath()* will not retain URI information like 
> scheme, host, etc. So, the *uri* passed to checkPermissionOfOther will differ 
> from the original {*}uri{*}.
> For example, if uri is "file:/foo.invalid.com:8080/tmp/testing", then 
> {code:java}
> uri.getPath -> /foo.invalid.com:8080/tmp/testing
> uri.toString -> file:/foo.invalid.com:8080/tmp/testing{code}
> The consequence of this bug is that we will *double RPC calls* when the 
> resources are remote, which is unnecessary. We see nontrivial overhead when 
> checking those resources from our HDFS, especially when HDFS is overloaded. 
>  
> Ref: related code within *ClientDistributedCacheManager*
> {code:java}
> def addResource(...) {
>     val destStatus = statCache.getOrElse(destPath.toUri(), 
> fs.getFileStatus(destPath))
> val visibility = getVisibility(conf, destPath.toUri(), statCache)
> }
> private[yarn] def getVisibility() {
> isPublic(conf, uri, statCache)
> }
> private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, 
> FileStatus]): Boolean = {
> val current = new Path(uri.getPath()) // Should not use getPath
> checkPermissionOfOther(fs, uri, FsAction.READ, statCache)
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44215) Client receives zero number of chunks in merge meta response which doesn't trigger fallback to unmerged blocks

2023-07-05 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740368#comment-17740368
 ] 

Mridul Muralidharan commented on SPARK-44215:
-

Issue resolved by pull request 41762
https://github.com/apache/spark/pull/41762

> Client receives zero number of chunks in merge meta response which doesn't 
> trigger fallback to unmerged blocks
> --
>
> Key: SPARK-44215
> URL: https://issues.apache.org/jira/browse/SPARK-44215
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>Priority: Major
> Fix For: 3.3.3, 3.5.0, 3.4.2
>
>
> We still see instances of the server returning 0 {{numChunks}} in 
> {{mergedMetaResponse}} which causes the executor to fail with 
> {{ArithmeticException}}. 
> {code}
> java.lang.ArithmeticException: / by zero
>   at 
> org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> {code}
> Here the executor doesn't fallback to fetch un-merged blocks and this also 
> doesn't result in a {{FetchFailure}}. So, the application fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44215) Client receives zero number of chunks in merge meta response which doesn't trigger fallback to unmerged blocks

2023-07-05 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-44215:

Fix Version/s: 3.3.3

> Client receives zero number of chunks in merge meta response which doesn't 
> trigger fallback to unmerged blocks
> --
>
> Key: SPARK-44215
> URL: https://issues.apache.org/jira/browse/SPARK-44215
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>Priority: Major
> Fix For: 3.3.3, 3.5.0, 3.4.2
>
>
> We still see instances of the server returning 0 {{numChunks}} in 
> {{mergedMetaResponse}} which causes the executor to fail with 
> {{ArithmeticException}}. 
> {code}
> java.lang.ArithmeticException: / by zero
>   at 
> org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> {code}
> Here the executor doesn't fallback to fetch un-merged blocks and this also 
> doesn't result in a {{FetchFailure}}. So, the application fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44215) Client receives zero number of chunks in merge meta response which doesn't trigger fallback to unmerged blocks

2023-07-04 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-44215.
-
  Assignee: Chandni Singh
Resolution: Fixed

> Client receives zero number of chunks in merge meta response which doesn't 
> trigger fallback to unmerged blocks
> --
>
> Key: SPARK-44215
> URL: https://issues.apache.org/jira/browse/SPARK-44215
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>Priority: Major
> Fix For: 3.5.0, 3.4.2
>
>
> We still see instances of the server returning 0 {{numChunks}} in 
> {{mergedMetaResponse}} which causes the executor to fail with 
> {{ArithmeticException}}. 
> {code}
> java.lang.ArithmeticException: / by zero
>   at 
> org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> {code}
> Here the executor doesn't fallback to fetch un-merged blocks and this also 
> doesn't result in a {{FetchFailure}}. So, the application fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44215) Client receives zero number of chunks in merge meta response which doesn't trigger fallback to unmerged blocks

2023-07-04 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-44215:

Fix Version/s: 3.5.0
   3.4.2

> Client receives zero number of chunks in merge meta response which doesn't 
> trigger fallback to unmerged blocks
> --
>
> Key: SPARK-44215
> URL: https://issues.apache.org/jira/browse/SPARK-44215
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Major
> Fix For: 3.5.0, 3.4.2
>
>
> We still see instances of the server returning 0 {{numChunks}} in 
> {{mergedMetaResponse}} which causes the executor to fail with 
> {{ArithmeticException}}. 
> {code}
> java.lang.ArithmeticException: / by zero
>   at 
> org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> {code}
> Here the executor doesn't fallback to fetch un-merged blocks and this also 
> doesn't result in a {{FetchFailure}}. So, the application fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42784) Fix the problem of incomplete creation of subdirectories in push merged localDir

2023-06-30 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-42784.
-
Resolution: Fixed

> Fix the problem of incomplete creation of subdirectories in push merged 
> localDir
> 
>
> Key: SPARK-42784
> URL: https://issues.apache.org/jira/browse/SPARK-42784
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 3.3.2
>Reporter: Fencheng Mei
>Assignee: Fencheng Mei
>Priority: Major
> Fix For: 3.3.3, 3.5.0, 3.4.2
>
>
> After we massively enabled push-based shuffle in our production environment, 
> we found some warn messages appearing in the server-side log messages.
> the warning log like:
> ShuffleBlockPusher: Pushing block shufflePush_3_0_5352_935 to 
> BlockManagerId(shuffle-push-merger, zw06-data-hdp-dn08251.mt, 7337, None) 
> failed.
> java.lang.RuntimeException: java.lang.RuntimeException: Cannot initialize 
> merged shuffle partition for appId application_1671244879475_44020960 
> shuffleId 3 shuffleMergeId 0 reduceId 935.
> After investigation, we identified the triggering mechanism of the bug。
> The driver requested two different containers on the same physical machine. 
> During the creation of the 'push-merged' directory in the first container 
> (container_1), the mergeDir was created first, then the subDir were created 
> based on the value of the "spark.diskStore.subDirectories" parameter. 
> However, the resources of container_1 were preempted during the creation of 
> the sub-directories, resulting in subDir not being created (only part of it 
> was created ). As the mergeDir still existed, the second container 
> (container_2) was unable to create further subDir (as it assumed that all 
> directories had already been created).
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42784) Fix the problem of incomplete creation of subdirectories in push merged localDir

2023-06-30 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-42784:

Fix Version/s: 3.3.3
   3.5.0
   3.4.2

> Fix the problem of incomplete creation of subdirectories in push merged 
> localDir
> 
>
> Key: SPARK-42784
> URL: https://issues.apache.org/jira/browse/SPARK-42784
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 3.3.2
>Reporter: Fencheng Mei
>Assignee: Fencheng Mei
>Priority: Major
> Fix For: 3.3.3, 3.5.0, 3.4.2
>
>
> After we massively enabled push-based shuffle in our production environment, 
> we found some warn messages appearing in the server-side log messages.
> the warning log like:
> ShuffleBlockPusher: Pushing block shufflePush_3_0_5352_935 to 
> BlockManagerId(shuffle-push-merger, zw06-data-hdp-dn08251.mt, 7337, None) 
> failed.
> java.lang.RuntimeException: java.lang.RuntimeException: Cannot initialize 
> merged shuffle partition for appId application_1671244879475_44020960 
> shuffleId 3 shuffleMergeId 0 reduceId 935.
> After investigation, we identified the triggering mechanism of the bug。
> The driver requested two different containers on the same physical machine. 
> During the creation of the 'push-merged' directory in the first container 
> (container_1), the mergeDir was created first, then the subDir were created 
> based on the value of the "spark.diskStore.subDirectories" parameter. 
> However, the resources of container_1 were preempted during the creation of 
> the sub-directories, resulting in subDir not being created (only part of it 
> was created ). As the mergeDir still existed, the second container 
> (container_2) was unable to create further subDir (as it assumed that all 
> directories had already been created).
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42784) Fix the problem of incomplete creation of subdirectories in push merged localDir

2023-06-30 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-42784:
---

Assignee: Fencheng Mei

> Fix the problem of incomplete creation of subdirectories in push merged 
> localDir
> 
>
> Key: SPARK-42784
> URL: https://issues.apache.org/jira/browse/SPARK-42784
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 3.3.2
>Reporter: Fencheng Mei
>Assignee: Fencheng Mei
>Priority: Major
>
> After we massively enabled push-based shuffle in our production environment, 
> we found some warn messages appearing in the server-side log messages.
> the warning log like:
> ShuffleBlockPusher: Pushing block shufflePush_3_0_5352_935 to 
> BlockManagerId(shuffle-push-merger, zw06-data-hdp-dn08251.mt, 7337, None) 
> failed.
> java.lang.RuntimeException: java.lang.RuntimeException: Cannot initialize 
> merged shuffle partition for appId application_1671244879475_44020960 
> shuffleId 3 shuffleMergeId 0 reduceId 935.
> After investigation, we identified the triggering mechanism of the bug。
> The driver requested two different containers on the same physical machine. 
> During the creation of the 'push-merged' directory in the first container 
> (container_1), the mergeDir was created first, then the subDir were created 
> based on the value of the "spark.diskStore.subDirectories" parameter. 
> However, the resources of container_1 were preempted during the creation of 
> the sub-directories, resulting in subDir not being created (only part of it 
> was created ). As the mergeDir still existed, the second container 
> (container_2) was unable to create further subDir (as it assumed that all 
> directories had already been created).
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43237) Handle null exception message in event log

2023-04-28 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-43237.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 40911
[https://github.com/apache/spark/pull/40911]

> Handle null exception message in event log
> --
>
> Key: SPARK-43237
> URL: https://issues.apache.org/jira/browse/SPARK-43237
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Zhongwei Zhu
>Assignee: Zhongwei Zhu
>Priority: Minor
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43237) Handle null exception message in event log

2023-04-28 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-43237:
---

Assignee: Zhongwei Zhu

> Handle null exception message in event log
> --
>
> Key: SPARK-43237
> URL: https://issues.apache.org/jira/browse/SPARK-43237
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Zhongwei Zhu
>Assignee: Zhongwei Zhu
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43052) Handle stacktrace with null file name in event log

2023-04-26 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-43052.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 40687
[https://github.com/apache/spark/pull/40687]

> Handle stacktrace with null file name in event log
> --
>
> Key: SPARK-43052
> URL: https://issues.apache.org/jira/browse/SPARK-43052
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.2
>Reporter: Zhongwei Zhu
>Assignee: Zhongwei Zhu
>Priority: Minor
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43052) Handle stacktrace with null file name in event log

2023-04-26 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-43052:
---

Assignee: Zhongwei Zhu

> Handle stacktrace with null file name in event log
> --
>
> Key: SPARK-43052
> URL: https://issues.apache.org/jira/browse/SPARK-43052
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.2
>Reporter: Zhongwei Zhu
>Assignee: Zhongwei Zhu
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43179) Add option for applications to control saving of metadata in the External Shuffle Service LevelDB

2023-04-21 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-43179:
---

Assignee: Chandni Singh

> Add option for applications to control saving of metadata in the External 
> Shuffle Service LevelDB
> -
>
> Key: SPARK-43179
> URL: https://issues.apache.org/jira/browse/SPARK-43179
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.4.0
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>Priority: Major
>
> Currently, the External Shuffle Service stores application metadata in 
> LevelDB. This is necessary to enable the shuffle server to resume serving 
> shuffle data for an application whose executors registered before the 
> NodeManager restarts. However, the metadata includes the application secret, 
> which is stored in LevelDB without encryption. This is a potential security 
> risk, particularly for applications with high security requirements. While 
> filesystem access control lists (ACLs) can help protect keys and 
> certificates, they may not be sufficient for some use cases. In response, we 
> have decided not to store metadata for these high-security applications in 
> LevelDB. As a result, these applications may experience more failures in the 
> event of a node restart, but we believe this trade-off is acceptable given 
> the increased security risk.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43179) Add option for applications to control saving of metadata in the External Shuffle Service LevelDB

2023-04-21 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-43179.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 40843
[https://github.com/apache/spark/pull/40843]

> Add option for applications to control saving of metadata in the External 
> Shuffle Service LevelDB
> -
>
> Key: SPARK-43179
> URL: https://issues.apache.org/jira/browse/SPARK-43179
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.4.0
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>Priority: Major
> Fix For: 3.5.0
>
>
> Currently, the External Shuffle Service stores application metadata in 
> LevelDB. This is necessary to enable the shuffle server to resume serving 
> shuffle data for an application whose executors registered before the 
> NodeManager restarts. However, the metadata includes the application secret, 
> which is stored in LevelDB without encryption. This is a potential security 
> risk, particularly for applications with high security requirements. While 
> filesystem access control lists (ACLs) can help protect keys and 
> certificates, they may not be sufficient for some use cases. In response, we 
> have decided not to store metadata for these high-security applications in 
> LevelDB. As a result, these applications may experience more failures in the 
> event of a node restart, but we believe this trade-off is acceptable given 
> the increased security risk.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42922) Use SecureRandom, instead of Random in security sensitive contexts

2023-03-25 Thread Mridul Muralidharan (Jira)
Mridul Muralidharan created SPARK-42922:
---

 Summary: Use SecureRandom, instead of Random in security sensitive 
contexts
 Key: SPARK-42922
 URL: https://issues.apache.org/jira/browse/SPARK-42922
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.2, 3.2.3, 3.4.0, 3.5.0
Reporter: Mridul Muralidharan


Most uses of Random in spark are either in test cases or where we need a pseudo 
random number which is repeatable.
The following are usages where moving from Random to SecureRandom would be 
useful

a) HttpAuthUtils.createCookieToken
b) ThriftHttpServlet.RAN



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40082) DAGScheduler may not schduler new stage in condition of push-based shuffle enabled

2023-03-22 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40082.
-
Fix Version/s: 3.5.0
 Assignee: Fencheng Mei
   Resolution: Fixed

> DAGScheduler may not schduler new stage in condition of push-based shuffle 
> enabled
> --
>
> Key: SPARK-40082
> URL: https://issues.apache.org/jira/browse/SPARK-40082
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 3.1.1
>Reporter: Penglei Shi
>Assignee: Fencheng Mei
>Priority: Major
> Fix For: 3.5.0
>
> Attachments: missParentStages.png, shuffleMergeFinalized.png, 
> submitMissingTasks.png
>
>
> In condition of push-based shuffle being enabled and speculative tasks 
> existing, a shuffleMapStage will be resubmitting once fetchFailed occurring, 
> then its parent stages will be resubmitting firstly and it will cost some 
> time to compute. Before the shuffleMapStage being resubmitted, its all 
> speculative tasks success and register map output, but speculative task 
> successful events can not trigger shuffleMergeFinalized because this stage 
> has been removed from runningStages.
> Then this stage is resubmitted, but speculative tasks have registered map 
> output and there are no missing tasks to compute, resubmitting stages will 
> also not trigger shuffleMergeFinalized. Eventually this stage‘s 
> _shuffleMergedFinalized keeps false.
> Then AQE will submit next stages which are dependent on  this shuffleMapStage 
> occurring fetchFailed. And in getMissingParentStages, this stage will be 
> marked as missing and will be resubmitted, but next stages are added to 
> waitingStages after this stage being finished, so next stages will not be 
> submitted even though this stage's resubmitting has been finished.
> I have only met some times in my production env and it is difficult to 
> reproduce。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42577) A large stage could run indefinitely due to executor lost

2023-03-13 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-42577.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 40286
[https://github.com/apache/spark/pull/40286]

> A large stage could run indefinitely due to executor lost
> -
>
> Key: SPARK-42577
> URL: https://issues.apache.org/jira/browse/SPARK-42577
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.3, 3.1.3, 3.2.3, 3.3.2
>Reporter: wuyi
>Assignee: Tengfei Huang
>Priority: Major
> Fix For: 3.5.0
>
>
> When a stage is extremely large and Spark runs on spot instances or 
> problematic clusters with frequent worker/executor loss,  the stage could run 
> indefinitely due to task rerun caused by the executor loss. This happens, 
> when the external shuffle service is on, and the large stages runs hours to 
> complete, when spark tries to submit a child stage, it will find the parent 
> stage - the large one, has missed some partitions, so the large stage has to 
> rerun. When it completes again, it finds new missing partitions due to the 
> same reason.
> We should add a attempt limitation for this kind of scenario.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42577) A large stage could run indefinitely due to executor lost

2023-03-13 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-42577:
---

Assignee: Tengfei Huang

> A large stage could run indefinitely due to executor lost
> -
>
> Key: SPARK-42577
> URL: https://issues.apache.org/jira/browse/SPARK-42577
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.3, 3.1.3, 3.2.3, 3.3.2
>Reporter: wuyi
>Assignee: Tengfei Huang
>Priority: Major
>
> When a stage is extremely large and Spark runs on spot instances or 
> problematic clusters with frequent worker/executor loss,  the stage could run 
> indefinitely due to task rerun caused by the executor loss. This happens, 
> when the external shuffle service is on, and the large stages runs hours to 
> complete, when spark tries to submit a child stage, it will find the parent 
> stage - the large one, has missed some partitions, so the large stage has to 
> rerun. When it completes again, it finds new missing partitions due to the 
> same reason.
> We should add a attempt limitation for this kind of scenario.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42719) `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

2023-03-09 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-42719.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 40339
[https://github.com/apache/spark/pull/40339]

> `MapOutputTracker#getMapLocation` should respect  
> `spark.shuffle.reduceLocality.enabled`
> 
>
> Key: SPARK-42719
> URL: https://issues.apache.org/jira/browse/SPARK-42719
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.5.0
>Reporter: He Qi
>Assignee: He Qi
>Priority: Major
> Fix For: 3.5.0
>
>
> Discuss as [https://github.com/apache/spark/pull/40307]
> {{getPreferredLocations}} in {{ShuffledRowRDD}} should return {{Nil}} at the 
> very beginning in case {{spark.shuffle.reduceLocality.enabled = false}} 
> (conceptually).
> This logic is pushed into MapOutputTracker though - and 
> {{getPreferredLocationsForShuffle}} honors 
> {{spark.shuffle.reduceLocality.enabled}} - but {{getMapLocation}} does not.
> So the fix would be to fix {{getMapLocation}} to honor the parameter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42719) `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled`

2023-03-09 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-42719:
---

Assignee: He Qi

> `MapOutputTracker#getMapLocation` should respect  
> `spark.shuffle.reduceLocality.enabled`
> 
>
> Key: SPARK-42719
> URL: https://issues.apache.org/jira/browse/SPARK-42719
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.5.0
>Reporter: He Qi
>Assignee: He Qi
>Priority: Major
>
> Discuss as [https://github.com/apache/spark/pull/40307]
> {{getPreferredLocations}} in {{ShuffledRowRDD}} should return {{Nil}} at the 
> very beginning in case {{spark.shuffle.reduceLocality.enabled = false}} 
> (conceptually).
> This logic is pushed into MapOutputTracker though - and 
> {{getPreferredLocationsForShuffle}} honors 
> {{spark.shuffle.reduceLocality.enabled}} - but {{getMapLocation}} does not.
> So the fix would be to fix {{getMapLocation}} to honor the parameter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42689) Allow ShuffleDriverComponent to declare if shuffle data is reliably stored

2023-03-08 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-42689:
---

Assignee: Mridul Muralidharan

> Allow ShuffleDriverComponent to declare if shuffle data is reliably stored
> --
>
> Key: SPARK-42689
> URL: https://issues.apache.org/jira/browse/SPARK-42689
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.5.0
>Reporter: Mridul Muralidharan
>Assignee: Mridul Muralidharan
>Priority: Major
>
> Currently, if there is an executor node loss, we assume the shuffle data on 
> that node is also lost. This is not necessarily the case if there is a 
> shuffle component managing the shuffle data and reliably maintaining it (for 
> example, in distributed filesystem or in a disaggregated shuffle cluster).
> Downstream projects have patches to Apache Spark in order to workaround this 
> issue, for example Apache Celeborn has 
> [this|https://github.com/apache/incubator-celeborn/blob/main/assets/spark-patch/RSS_RDA_spark3.patch].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42689) Allow ShuffleDriverComponent to declare if shuffle data is reliably stored

2023-03-08 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-42689.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 40307
[https://github.com/apache/spark/pull/40307]

> Allow ShuffleDriverComponent to declare if shuffle data is reliably stored
> --
>
> Key: SPARK-42689
> URL: https://issues.apache.org/jira/browse/SPARK-42689
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.5.0
>Reporter: Mridul Muralidharan
>Assignee: Mridul Muralidharan
>Priority: Major
> Fix For: 3.5.0
>
>
> Currently, if there is an executor node loss, we assume the shuffle data on 
> that node is also lost. This is not necessarily the case if there is a 
> shuffle component managing the shuffle data and reliably maintaining it (for 
> example, in distributed filesystem or in a disaggregated shuffle cluster).
> Downstream projects have patches to Apache Spark in order to workaround this 
> issue, for example Apache Celeborn has 
> [this|https://github.com/apache/incubator-celeborn/blob/main/assets/spark-patch/RSS_RDA_spark3.patch].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42689) Allow ShuffleDriverComponent to declare if shuffle data is reliably stored

2023-03-06 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-42689:

Description: 
Currently, if there is an executor node loss, we assume the shuffle data on 
that node is also lost. This is not necessarily the case if there is a shuffle 
component managing the shuffle data and reliably maintaining it (for example, 
in distributed filesystem or in a disaggregated shuffle cluster).

Downstream projects have patches to Apache Spark in order to workaround this 
issue, for example Apache Celeborn has 
[this|https://github.com/apache/incubator-celeborn/blob/main/assets/spark-patch/RSS_RDA_spark3.patch].

  was:
Currently, if there is an executor node loss, we assume the shuffle data on 
that node is also lost. This does not necessarily the case if there is a 
shuffle component managing the shuffle data and reliably maintaining it (for 
example, in distributed filesystem or in a disaggregated shuffle cluster).

Downstream projects have patches to Apache Spark in order to workaround this 
issue, for example Apache Celeborn has 
[this|https://github.com/apache/incubator-celeborn/blob/main/assets/spark-patch/RSS_RDA_spark3.patch].


> Allow ShuffleDriverComponent to declare if shuffle data is reliably stored
> --
>
> Key: SPARK-42689
> URL: https://issues.apache.org/jira/browse/SPARK-42689
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.1.0, 3.2.0, 3.3.0, 3.4.0
>Reporter: Mridul Muralidharan
>Priority: Major
>
> Currently, if there is an executor node loss, we assume the shuffle data on 
> that node is also lost. This is not necessarily the case if there is a 
> shuffle component managing the shuffle data and reliably maintaining it (for 
> example, in distributed filesystem or in a disaggregated shuffle cluster).
> Downstream projects have patches to Apache Spark in order to workaround this 
> issue, for example Apache Celeborn has 
> [this|https://github.com/apache/incubator-celeborn/blob/main/assets/spark-patch/RSS_RDA_spark3.patch].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42689) Allow ShuffleDriverComponent to declare if shuffle data is reliably stored

2023-03-06 Thread Mridul Muralidharan (Jira)
Mridul Muralidharan created SPARK-42689:
---

 Summary: Allow ShuffleDriverComponent to declare if shuffle data 
is reliably stored
 Key: SPARK-42689
 URL: https://issues.apache.org/jira/browse/SPARK-42689
 Project: Spark
  Issue Type: Sub-task
  Components: Spark Core
Affects Versions: 3.3.0, 3.2.0, 3.1.0, 3.4.0
Reporter: Mridul Muralidharan


Currently, if there is an executor node loss, we assume the shuffle data on 
that node is also lost. This does not necessarily the case if there is a 
shuffle component managing the shuffle data and reliably maintaining it (for 
example, in distributed filesystem or in a disaggregated shuffle cluster).

Downstream projects have patches to Apache Spark in order to workaround this 
issue, for example Apache Celeborn has 
[this|https://github.com/apache/incubator-celeborn/blob/main/assets/spark-patch/RSS_RDA_spark3.patch].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2023-03-02 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-41497.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 39459
[https://github.com/apache/spark/pull/39459]

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Assignee: Tengfei Huang
>Priority: Major
> Fix For: 3.5.0
>
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2023-03-02 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-41497:
---

Assignee: Tengfei Huang

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Assignee: Tengfei Huang
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42366) Log shuffle data corruption diagnose cause

2023-02-09 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-42366.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

Issue resolved by pull request 39918
[https://github.com/apache/spark/pull/39918]

> Log shuffle data corruption diagnose cause
> --
>
> Key: SPARK-42366
> URL: https://issues.apache.org/jira/browse/SPARK-42366
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: dzcxzl
>Assignee: dzcxzl
>Priority: Minor
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42366) Log shuffle data corruption diagnose cause

2023-02-09 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-42366:
---

Assignee: dzcxzl

> Log shuffle data corruption diagnose cause
> --
>
> Key: SPARK-42366
> URL: https://issues.apache.org/jira/browse/SPARK-42366
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: dzcxzl
>Assignee: dzcxzl
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42090) Introduce sasl retry count in RetryingBlockTransferor

2023-01-24 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-42090:

Fix Version/s: 3.2.4
   3.3.2

> Introduce sasl retry count in RetryingBlockTransferor
> -
>
> Key: SPARK-42090
> URL: https://issues.apache.org/jira/browse/SPARK-42090
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
> Fix For: 3.2.4, 3.3.2, 3.4.0
>
>
> Previously a boolean variable, saslTimeoutSeen, was used in 
> RetryingBlockTransferor. However, the boolean variable wouldn't cover the 
> following scenario:
> 1. SaslTimeoutException
> 2. IOException
> 3. SaslTimeoutException
> 4. IOException
> Even though IOException at #2 is retried (resulting in increment of 
> retryCount), the retryCount would be cleared at step #4.
> Since the intention of saslTimeoutSeen is to undo the increment due to 
> retrying SaslTimeoutException, we should keep a counter for 
> SaslTimeoutException retries and subtract the value of this counter from 
> retryCount.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42149) Remove the env `SPARK_USE_CONC_INCR_GC` used to enable CMS GC for Yarn AM

2023-01-21 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-42149.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 39674
[https://github.com/apache/spark/pull/39674]

> Remove the env `SPARK_USE_CONC_INCR_GC` used to enable CMS GC for Yarn AM 
> --
>
> Key: SPARK-42149
> URL: https://issues.apache.org/jira/browse/SPARK-42149
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Minor
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42149) Remove the env `SPARK_USE_CONC_INCR_GC` used to enable CMS GC for Yarn AM

2023-01-21 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-42149:
---

Assignee: Yang Jie

> Remove the env `SPARK_USE_CONC_INCR_GC` used to enable CMS GC for Yarn AM 
> --
>
> Key: SPARK-42149
> URL: https://issues.apache.org/jira/browse/SPARK-42149
> Project: Spark
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41415) SASL Request Retries

2023-01-20 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-41415:

Fix Version/s: 3.2.4
   3.3.2

> SASL Request Retries
> 
>
> Key: SPARK-41415
> URL: https://issues.apache.org/jira/browse/SPARK-41415
> Project: Spark
>  Issue Type: Task
>  Components: Shuffle
>Affects Versions: 3.2.4
>Reporter: Aravind Patnam
>Assignee: Aravind Patnam
>Priority: Major
> Fix For: 3.2.4, 3.3.2, 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-42090) Introduce sasl retry count in RetryingBlockTransferor

2023-01-16 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-42090.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 39611
[https://github.com/apache/spark/pull/39611]

> Introduce sasl retry count in RetryingBlockTransferor
> -
>
> Key: SPARK-42090
> URL: https://issues.apache.org/jira/browse/SPARK-42090
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
> Fix For: 3.4.0
>
>
> Previously a boolean variable, saslTimeoutSeen, was used in 
> RetryingBlockTransferor. However, the boolean variable wouldn't cover the 
> following scenario:
> 1. SaslTimeoutException
> 2. IOException
> 3. SaslTimeoutException
> 4. IOException
> Even though IOException at #2 is retried (resulting in increment of 
> retryCount), the retryCount would be cleared at step #4.
> Since the intention of saslTimeoutSeen is to undo the increment due to 
> retrying SaslTimeoutException, we should keep a counter for 
> SaslTimeoutException retries and subtract the value of this counter from 
> retryCount.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-42090) Introduce sasl retry count in RetryingBlockTransferor

2023-01-16 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-42090:
---

Assignee: Ted Yu

> Introduce sasl retry count in RetryingBlockTransferor
> -
>
> Key: SPARK-42090
> URL: https://issues.apache.org/jira/browse/SPARK-42090
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
>
> Previously a boolean variable, saslTimeoutSeen, was used in 
> RetryingBlockTransferor. However, the boolean variable wouldn't cover the 
> following scenario:
> 1. SaslTimeoutException
> 2. IOException
> 3. SaslTimeoutException
> 4. IOException
> Even though IOException at #2 is retried (resulting in increment of 
> retryCount), the retryCount would be cleared at step #4.
> Since the intention of saslTimeoutSeen is to undo the increment due to 
> retrying SaslTimeoutException, we should keep a counter for 
> SaslTimeoutException retries and subtract the value of this counter from 
> retryCount.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41415) SASL Request Retries

2023-01-14 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-41415:
---

Assignee: Aravind Patnam  (was: Aravind Patnam)

> SASL Request Retries
> 
>
> Key: SPARK-41415
> URL: https://issues.apache.org/jira/browse/SPARK-41415
> Project: Spark
>  Issue Type: Task
>  Components: Shuffle
>Affects Versions: 3.2.4
>Reporter: Aravind Patnam
>Assignee: Aravind Patnam
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41415) SASL Request Retries

2023-01-14 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-41415.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38959
[https://github.com/apache/spark/pull/38959]

> SASL Request Retries
> 
>
> Key: SPARK-41415
> URL: https://issues.apache.org/jira/browse/SPARK-41415
> Project: Spark
>  Issue Type: Task
>  Components: Shuffle
>Affects Versions: 3.2.4
>Reporter: Aravind Patnam
>Assignee: Aravind Patnam
>Priority: Major
> Fix For: 3.4.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41415) SASL Request Retries

2023-01-14 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-41415:
---

Assignee: Aravind Patnam

> SASL Request Retries
> 
>
> Key: SPARK-41415
> URL: https://issues.apache.org/jira/browse/SPARK-41415
> Project: Spark
>  Issue Type: Task
>  Components: Shuffle
>Affects Versions: 3.2.4
>Reporter: Aravind Patnam
>Assignee: Aravind Patnam
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40480) Remove push-based shuffle data after query finished

2023-01-14 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40480:
---

Assignee: Wan Kun

> Remove push-based shuffle data after query finished
> ---
>
> Key: SPARK-40480
> URL: https://issues.apache.org/jira/browse/SPARK-40480
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.3.0
>Reporter: Wan Kun
>Assignee: Wan Kun
>Priority: Major
>
> Now spark will only cleanup shuffle data files except push-based shuffle 
> files.
> In our production cluster, push-based shuffle service will create too many 
> shuffle merge data files as there are several spark thrift server.
> Could we cleanup the merged data files after the query finished?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40480) Remove push-based shuffle data after query finished

2023-01-14 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40480.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 37922
[https://github.com/apache/spark/pull/37922]

> Remove push-based shuffle data after query finished
> ---
>
> Key: SPARK-40480
> URL: https://issues.apache.org/jira/browse/SPARK-40480
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.3.0
>Reporter: Wan Kun
>Assignee: Wan Kun
>Priority: Major
> Fix For: 3.4.0
>
>
> Now spark will only cleanup shuffle data files except push-based shuffle 
> files.
> In our production cluster, push-based shuffle service will create too many 
> shuffle merge data files as there are several spark thrift server.
> Could we cleanup the merged data files after the query finished?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-33573) Server side metrics related to push-based shuffle

2023-01-11 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-33573.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 37638
[https://github.com/apache/spark/pull/37638]

> Server side metrics related to push-based shuffle
> -
>
> Key: SPARK-33573
> URL: https://issues.apache.org/jira/browse/SPARK-33573
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Assignee: Minchu Yang
>Priority: Major
> Fix For: 3.4.0
>
>
> Shuffle Server side metrics for push based shuffle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-33573) Server side metrics related to push-based shuffle

2023-01-11 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-33573:
---

Assignee: Minchu Yang

> Server side metrics related to push-based shuffle
> -
>
> Key: SPARK-33573
> URL: https://issues.apache.org/jira/browse/SPARK-33573
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Assignee: Minchu Yang
>Priority: Major
>
> Shuffle Server side metrics for push based shuffle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-41953) Shuffle output location refetch during shuffle migration in decommission

2023-01-09 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17656480#comment-17656480
 ] 

Mridul Muralidharan edited comment on SPARK-41953 at 1/10/23 7:48 AM:
--

A few things:

* Looking at SPARK-27637, we should revisit it - the {{IsExecutorAlive}} 
request does not make sense in case of dynamic resource allocation (DRA) when 
an external shuffle service (ESS) is enabled : we should not be making that 
call. Thoughts [~Ngone51] ? +CC [~csingh]
This also means, relying on ExecutorDeadException when DRA is enabled with ESS 
is configured wont be useful.

For rest of the proposal ...

For (2),  I am not sure about 'Make MapOutputTracker support fetch latest 
output without epoch provided.' - this could have nontrivial interaction with 
other things, and I will need to think through it. Not sure if we can model 
node decommission - where we have block moved from host A to host B without any 
other change - as not requiring an epoch update (or rather, flag the epoch's as 
'compatible' - if there are no interleaving updates), requires analysis 

Assuming we sort out how to get updated state, (3) looks like a reasonable 
approach.





was (Author: mridulm80):

A few things:

* Looking at SPARK-27637, we should revisit it - the {{IsExecutorAlive}} 
request does not make sense in case of dynamic resource allocation (DRA) when 
an external shuffle service (ESS) is enabled : we should not be making that 
call. Thoughts [~Ngone51] ? +CC [~csingh]
This also means, relying on ExecutorDeadException when DRA is enabled with ESS 
is configured wont be useful.

For rest of the proposal ...

For (2),  I am not sure about 'Make MapOutputTracker support fetch latest 
output without epoch provided.' - this could have nontrivial interaction with 
other things, and I will need to think through it. Not sure if we can model 
node decommission - where we have block moved from host A to host B without any 
other change - as not requiring an epoch update, requires analysis 

Assuming we sort out how to get updated state, (3) looks like a reasonable 
approach.




> Shuffle output location refetch during shuffle migration in decommission
> 
>
> Key: SPARK-41953
> URL: https://issues.apache.org/jira/browse/SPARK-41953
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Zhongwei Zhu
>Priority: Major
>
> When shuffle migration enabled during spark decommissionm, shuffle data will 
> be migrated into live executors, then update latest location to 
> MapOutputTracker. It has some issues:
>  # Executors only do map output location fetch in the beginning of the reduce 
> stage, so any shuffle output location change in the middle of reduce will 
> cause FetchFailed as reducer fetch from old location. Even stage retries 
> could solve this, this still cause lots of resource waste as all shuffle read 
> and compute happened before FetchFailed partition will be wasted.
>  # During stage retries, less running tasks cause more executors to be 
> decommissioned and shuffle data location keep changing. In the worst case, 
> stage could need lots of retries, further breaking SLA.
> So I propose to support refetch map output location during reduce phase if 
> shuffle migration is enabled and FetchFailed is caused by a decommissioned 
> dead executor. The detailed steps as below:
>  # When `BlockTransferService` fetch blocks failed from a decommissioned dead 
> executor, ExecutorDeadException(isDecommission as true) will be thrown.
>  # Make MapOutputTracker support fetch latest output without epoch provided.
>  # `ShuffleBlockFetcherIterator` will refetch latest output from 
> MapOutputTrackMaster. For all the shuffle blocks on this decommissioned, 
> there should be a new location on another executor. If not, throw exception 
> as current. If yes, create new local and remote requests to fetch these 
> migrated shuffle blocks. The flow will be similar as failback fetch when push 
> merged fetch failed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41953) Shuffle output location refetch during shuffle migration in decommission

2023-01-09 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17656480#comment-17656480
 ] 

Mridul Muralidharan commented on SPARK-41953:
-


A few things:

* Looking at SPARK-27637, we should revisit it - the {{IsExecutorAlive}} 
request does not make sense in case of dynamic resource allocation (DRA) when 
an external shuffle service (ESS) is enabled : we should not be making that 
call. Thoughts [~Ngone51] ? +CC [~csingh]
This also means, relying on ExecutorDeadException when DRA is enabled with ESS 
is configured wont be useful.

For rest of the proposal ...

For (2),  I am not sure about 'Make MapOutputTracker support fetch latest 
output without epoch provided.' - this could have nontrivial interaction with 
other things, and I will need to think through it. Not sure if we can model 
node decommission - where we have block moved from host A to host B without any 
other change - as not requiring an epoch update, requires analysis 

Assuming we sort out how to get updated state, (3) looks like a reasonable 
approach.




> Shuffle output location refetch during shuffle migration in decommission
> 
>
> Key: SPARK-41953
> URL: https://issues.apache.org/jira/browse/SPARK-41953
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
>Reporter: Zhongwei Zhu
>Priority: Major
>
> When shuffle migration enabled during spark decommissionm, shuffle data will 
> be migrated into live executors, then update latest location to 
> MapOutputTracker. It has some issues:
>  # Executors only do map output location fetch in the beginning of the reduce 
> stage, so any shuffle output location change in the middle of reduce will 
> cause FetchFailed as reducer fetch from old location. Even stage retries 
> could solve this, this still cause lots of resource waste as all shuffle read 
> and compute happened before FetchFailed partition will be wasted.
>  # During stage retries, less running tasks cause more executors to be 
> decommissioned and shuffle data location keep changing. In the worst case, 
> stage could need lots of retries, further breaking SLA.
> So I propose to support refetch map output location during reduce phase if 
> shuffle migration is enabled and FetchFailed is caused by a decommissioned 
> dead executor. The detailed steps as below:
>  # When `BlockTransferService` fetch blocks failed from a decommissioned dead 
> executor, ExecutorDeadException(isDecommission as true) will be thrown.
>  # Make MapOutputTracker support fetch latest output without epoch provided.
>  # `ShuffleBlockFetcherIterator` will refetch latest output from 
> MapOutputTrackMaster. For all the shuffle blocks on this decommissioned, 
> there should be a new location on another executor. If not, throw exception 
> as current. If yes, create new local and remote requests to fetch these 
> migrated shuffle blocks. The flow will be similar as failback fetch when push 
> merged fetch failed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-36620) Client side related push-based shuffle metrics

2023-01-06 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-36620:
---

Assignee: Thejdeep Gudivada

> Client side related push-based shuffle metrics
> --
>
> Key: SPARK-36620
> URL: https://issues.apache.org/jira/browse/SPARK-36620
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Thejdeep Gudivada
>Assignee: Thejdeep Gudivada
>Priority: Major
>
> Need to add client side related metrics to push-based shuffle.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36620) Client side related push-based shuffle metrics

2023-01-06 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-36620.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 36165
[https://github.com/apache/spark/pull/36165]

> Client side related push-based shuffle metrics
> --
>
> Key: SPARK-36620
> URL: https://issues.apache.org/jira/browse/SPARK-36620
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Thejdeep Gudivada
>Assignee: Thejdeep Gudivada
>Priority: Major
> Fix For: 3.4.0
>
>
> Need to add client side related metrics to push-based shuffle.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2023-01-04 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17654590#comment-17654590
 ] 

Mridul Muralidharan commented on SPARK-41497:
-

Sounds good [~Ngone51], thanks !

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41792) Shuffle merge finalization removes the wrong finalization state from the DB

2023-01-01 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-41792.
-
Fix Version/s: 3.4.0
 Assignee: Mridul Muralidharan
   Resolution: Fixed

> Shuffle merge finalization removes the wrong finalization state from the DB
> ---
>
> Key: SPARK-41792
> URL: https://issues.apache.org/jira/browse/SPARK-41792
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.4.0
>Reporter: Mridul Muralidharan
>Assignee: Mridul Muralidharan
>Priority: Minor
> Fix For: 3.4.0
>
>
> During `finalizeShuffleMerge` in external shuffle service, if the 
> finalization request is for a newer shuffle merge id, then we cleanup the 
> existing (older) shuffle details and add the newer entry (for which we got no 
> pushed blocks) to the DB.
> Unfortunately, when cleaning up from the DB, we are using the incorrect 
> AppAttemptShuffleMergeId - we remove the latest shuffle merge id instead of 
> the existing entry.
> Proposed Fix:
> {code}
> diff --git 
> a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
>  
> b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
> index 816d1082850..551104d0eba 100644
> --- 
> a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
> +++ 
> b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
> @@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements 
> MergedShuffleFileManager {
>  } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
>// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
> then return
>// empty MergeStatuses but cleanup the older shuffleMergeId files.
> +  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new 
> AppAttemptShuffleMergeId(
> +  msg.appId, msg.appAttemptId, msg.shuffleId, 
> mergePartitionsInfo.shuffleMergeId);
>submitCleanupTask(() ->
>closeAndDeleteOutdatedPartitions(
> -  appAttemptShuffleMergeId, 
> mergePartitionsInfo.shuffleMergePartitions));
> +  currentAppAttemptShuffleMergeId, 
> mergePartitionsInfo.shuffleMergePartitions));
>  } else {
>// This block covers:
>//  1. finalization of determinate stage
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41792) Shuffle merge finalization removes the wrong finalization state from the DB

2023-01-01 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-41792:

Affects Version/s: (was: 3.3.0)

> Shuffle merge finalization removes the wrong finalization state from the DB
> ---
>
> Key: SPARK-41792
> URL: https://issues.apache.org/jira/browse/SPARK-41792
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.4.0
>Reporter: Mridul Muralidharan
>Priority: Minor
>
> During `finalizeShuffleMerge` in external shuffle service, if the 
> finalization request is for a newer shuffle merge id, then we cleanup the 
> existing (older) shuffle details and add the newer entry (for which we got no 
> pushed blocks) to the DB.
> Unfortunately, when cleaning up from the DB, we are using the incorrect 
> AppAttemptShuffleMergeId - we remove the latest shuffle merge id instead of 
> the existing entry.
> Proposed Fix:
> {code}
> diff --git 
> a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
>  
> b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
> index 816d1082850..551104d0eba 100644
> --- 
> a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
> +++ 
> b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
> @@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements 
> MergedShuffleFileManager {
>  } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
>// If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
> then return
>// empty MergeStatuses but cleanup the older shuffleMergeId files.
> +  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new 
> AppAttemptShuffleMergeId(
> +  msg.appId, msg.appAttemptId, msg.shuffleId, 
> mergePartitionsInfo.shuffleMergeId);
>submitCleanupTask(() ->
>closeAndDeleteOutdatedPartitions(
> -  appAttemptShuffleMergeId, 
> mergePartitionsInfo.shuffleMergePartitions));
> +  currentAppAttemptShuffleMergeId, 
> mergePartitionsInfo.shuffleMergePartitions));
>  } else {
>// This block covers:
>//  1. finalization of determinate stage
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41792) Shuffle merge finalization removes the wrong finalization state from the DB

2022-12-30 Thread Mridul Muralidharan (Jira)
Mridul Muralidharan created SPARK-41792:
---

 Summary: Shuffle merge finalization removes the wrong finalization 
state from the DB
 Key: SPARK-41792
 URL: https://issues.apache.org/jira/browse/SPARK-41792
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.3.0, 3.4.0
Reporter: Mridul Muralidharan


During `finalizeShuffleMerge` in external shuffle service, if the finalization 
request is for a newer shuffle merge id, then we cleanup the existing (older) 
shuffle details and add the newer entry (for which we got no pushed blocks) to 
the DB.

Unfortunately, when cleaning up from the DB, we are using the incorrect 
AppAttemptShuffleMergeId - we remove the latest shuffle merge id instead of the 
existing entry.

Proposed Fix:

{code}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 816d1082850..551104d0eba 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
   // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
   // empty MergeStatuses but cleanup the older shuffleMergeId files.
+  AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
   submitCleanupTask(() ->
   closeAndDeleteOutdatedPartitions(
-  appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+  currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
 } else {
   // This block covers:
   //  1. finalization of determinate stage
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41469) Task rerun on decommissioned executor can be avoided if shuffle data has migrated

2022-12-27 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-41469.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 39011
[https://github.com/apache/spark/pull/39011]

> Task rerun on decommissioned executor can be avoided if shuffle data has 
> migrated
> -
>
> Key: SPARK-41469
> URL: https://issues.apache.org/jira/browse/SPARK-41469
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
> Fix For: 3.4.0
>
>
> Currently, we will always rerun a finished shuffle map task if it once runs 
> the lost executor. However, in the case of the executor loss is caused by 
> decommission, the shuffle data might be migrated so that task doesn't need to 
> rerun.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41469) Task rerun on decommissioned executor can be avoided if shuffle data has migrated

2022-12-27 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-41469:
---

Assignee: wuyi

> Task rerun on decommissioned executor can be avoided if shuffle data has 
> migrated
> -
>
> Key: SPARK-41469
> URL: https://issues.apache.org/jira/browse/SPARK-41469
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
>
> Currently, we will always rerun a finished shuffle map task if it once runs 
> the lost executor. However, in the case of the executor loss is caused by 
> decommission, the shuffle data might be migrated so that task doesn't need to 
> rerun.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41192) Task finished before speculative task scheduled leads to holding idle executors

2022-12-20 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-41192:
---

Assignee: Yazhi Wang

> Task finished before speculative task scheduled leads to holding idle 
> executors
> ---
>
> Key: SPARK-41192
> URL: https://issues.apache.org/jira/browse/SPARK-41192
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.2, 3.3.1
>Reporter: Yazhi Wang
>Assignee: Yazhi Wang
>Priority: Minor
>  Labels: dynamic_allocation
> Attachments: dynamic-executors, dynamic-log
>
>
> When task finished before speculative task has been scheduled by 
> DAGScheduler, then the speculative tasks will be considered as pending and 
> count towards the calculation of number of needed executors, which will lead 
> to request more executors than needed
> h2. Background & Reproduce
> In one of our production job, we found that ExecutorAllocationManager was 
> holding more executors than needed. 
> We found it's difficult to reproduce in the test environment. In order to 
> stably reproduce and debug, we temporarily annotated the scheduling code of 
> speculative tasks in TaskSetManager:363 to ensure that the task be completed 
> before the speculative task being scheduled.
> {code:java}
> // Original code
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then 
> schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false).orElse(
>     dequeueTaskHelper(execId, host, maxLocality, true))
> } 
> // Speculative task will never be scheduled
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then 
> schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false)
> }  {code}
> Referring to examples in SPARK-30511
> You will see when running the last task, we would be hold 38 executors (see 
> attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 
> 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
> {code:java}
> ./bin/spark-shell --master yarn --conf spark.speculation=true --conf 
> spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf 
> spark.dynamicAllocation.minExecutors=20 --conf 
> spark.dynamicAllocation.maxExecutors=1000 {code}
> {code:java}
> val n = 4000
> val someRDD = sc.parallelize(1 to n, n)
> someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
> if (index > 3998) {
>     Thread.sleep(1000 * 1000)
> } else if (index > 3850) {
>     Thread.sleep(50 * 1000) // Fake running tasks
> } else {
>     Thread.sleep(100)
> }
> Array.fill[Int](1)(1).iterator{code}
>  
> I will have a PR ready to fix this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41192) Task finished before speculative task scheduled leads to holding idle executors

2022-12-20 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-41192.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38711
[https://github.com/apache/spark/pull/38711]

> Task finished before speculative task scheduled leads to holding idle 
> executors
> ---
>
> Key: SPARK-41192
> URL: https://issues.apache.org/jira/browse/SPARK-41192
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.2, 3.3.1
>Reporter: Yazhi Wang
>Assignee: Yazhi Wang
>Priority: Minor
>  Labels: dynamic_allocation
> Fix For: 3.4.0
>
> Attachments: dynamic-executors, dynamic-log
>
>
> When task finished before speculative task has been scheduled by 
> DAGScheduler, then the speculative tasks will be considered as pending and 
> count towards the calculation of number of needed executors, which will lead 
> to request more executors than needed
> h2. Background & Reproduce
> In one of our production job, we found that ExecutorAllocationManager was 
> holding more executors than needed. 
> We found it's difficult to reproduce in the test environment. In order to 
> stably reproduce and debug, we temporarily annotated the scheduling code of 
> speculative tasks in TaskSetManager:363 to ensure that the task be completed 
> before the speculative task being scheduled.
> {code:java}
> // Original code
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then 
> schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false).orElse(
>     dequeueTaskHelper(execId, host, maxLocality, true))
> } 
> // Speculative task will never be scheduled
> private def dequeueTask(
>     execId: String,
>     host: String,
>     maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, 
> Boolean)] = {
>   // Tries to schedule a regular task first; if it returns None, then 
> schedules
>   // a speculative task
>   dequeueTaskHelper(execId, host, maxLocality, false)
> }  {code}
> Referring to examples in SPARK-30511
> You will see when running the last task, we would be hold 38 executors (see 
> attachment), which is exactly (149 + 1) / 4 = 38. But actually there are only 
> 2 tasks in running, which requires Math.min(20, 2/4) = 20 executors indeed.
> {code:java}
> ./bin/spark-shell --master yarn --conf spark.speculation=true --conf 
> spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf 
> spark.dynamicAllocation.minExecutors=20 --conf 
> spark.dynamicAllocation.maxExecutors=1000 {code}
> {code:java}
> val n = 4000
> val someRDD = sc.parallelize(1 to n, n)
> someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
> if (index > 3998) {
>     Thread.sleep(1000 * 1000)
> } else if (index > 3850) {
>     Thread.sleep(50 * 1000) // Fake running tasks
> } else {
>     Thread.sleep(100)
> }
> Array.fill[Int](1)(1).iterator{code}
>  
> I will have a PR ready to fix this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-14 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646972#comment-17646972
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/14/22 8:03 AM:
---

[~Ngone51] Agree, that is what I was not sure of (whether we can detect this 
scenario about use of accumulators which might be updated subsequently). Note 
that updates to the same accumulator can happen before and after a cache in 
user code - so we might be able to only catch scenario when there are no 
accumulators.
If I am not wrong, SQL makes very heavy use of accumulators, and so most stages 
will end up having them anyway - right ?

I would expect this scenario (even without accumulator) to be fairly low 
frequency enough that the cost of extra recomputation might be fine.


was (Author: mridulm80):
[~Ngone51] Agree, that is what I was not sure of.
I would expect this scenario (even without accumulator) to be fairly low 
frequency enough that the cost of extra recomputation might be fine.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 

[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646972#comment-17646972
 ] 

Mridul Muralidharan commented on SPARK-41497:
-

[~Ngone51] Agree, that is what I was not sure of.
I would expect this scenario (even without accumulator) to be fairly low 
frequency enough that the cost of extra recomputation might be fine.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646962#comment-17646962
 ] 

Mridul Muralidharan commented on SPARK-41497:
-

Agree, if we can determine that - do we have a way to do that ?

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to load the rdd 
> cache: this way defines a rdd cache is only valid/accessible if the task has 
> succeeded. This way could be either overkill or a bit complex (because 
> currently Spark would clean up the task state once it’s finished. So we need 
> to maintain a structure to know if task once succeeded or not. )



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 9:20 PM:
---

> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that local read iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.


was (Author: mridulm80):
> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in 

[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 9:20 PM:
---

> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that local read iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).

Probably 'usable' is incorrect term - 'visible' might be better ? That is, is 
this block visible to others (outside of the generating task).

[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.


was (Author: mridulm80):
> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that local read iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes 

[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-13 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646825#comment-17646825
 ] 

Mridul Muralidharan commented on SPARK-41497:
-

> For example, a task is constructed by `rdd1.cache().rdd2`. So if the task 
> fails due to rdd2's computation, I think rdd1's cache should still be able to 
> reuse.

We have three cases here.
For some initial task T1:

a) Computation of rdd2 within T1 should have no issues, since initial 
computation would result in caching it locally - and rdd2 computation is using 
the result of that iterator [1].
b) Failure of T1 and now T2 executes for the same partition - as proposed, T2 
will not use result of T1's cache, since it not marked as usable (even though 
the block might be still marked cached [2]).
c) Replication and/or decom and T2 runs - same case as (b).


[1] We should special case and allow reads from the same task which cached the 
block - even if it has not yet been marked usable.
[2] When T1 failed, we would drop the blocks which got cached due to it. But 
even in the case of a race, the flag prevents the use of the cached block.

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce accumulator cache: first, this requires a new framework 
> for supporting accumulator cache; second, the driver should improve its logic 
> to distinguish whether the accumulator cache value should be reported to the 
> user to avoid overcounting. For example, in the case of task retry, the value 
> should be reported. However, in the case of rdd cache reuse, the value 
> shouldn’t be reported (should it?);
> Option 4: Do task success validation when a task trying to 

[jira] [Comment Edited] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-12 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646477#comment-17646477
 ] 

Mridul Muralidharan edited comment on SPARK-41497 at 12/13/22 7:41 AM:
---

Agree - there appears to be a bunch of scenarios where this can be triggered.
Essentially, whenever block save succeeds and task itself fails, we can end up 
with this scenario.
As detailed, this could be storage level with replication > 1, block 
decomissioning, etc - where executor or driver has replicated the data.
I think this can also happen when task itself fails - but after persisting the 
data (even without replication or decomm) - for example, due to some 
local/transient shuffle write issues (for example) or commitdenied, etc.

For the options listed:
Agree, option 1 does not solve the issue.
I am also not inclined towards option 2 due to the potential perf impact - 
though I would expect this to be a rare scenario.
Option 3 looks like a very involved approach, and I am not sure if we can cover 
all the corner cases.

I am wondering if we can modify option 4 such that it helps.
There are multiple approaches perhaps - one strawman proposal:
a) Add a bit to BlockStatus indicating whether block can be used or not. And 
currently this bit gets flipped when the task which computed it successfully 
completes.
b) Maintain a taskId -> BlockStatus* mapping - which is cleaned up whenever a 
task completes (if successful, then flip bit - else remove its blocks - and 
replicas (if any)).
c) Propagate taskId in reportBlockStatus from doPutIterator, etc - where new 
block is getting created in the system.

Thoughts ?


was (Author: mridulm80):
Agree - there appears to be a bunch of scenarios where this can be triggered.
Essentially, whenever block save succeeds and task itself fails, we can end up 
with this scenario.
As detailed, this could be storage level with replication > 1, block 
decomissioning, etc - where executor or driver has replicated the data.
I think this can also happen when task itself fails - but after persisting the 
data (even without replication or decomm) - for example, due to some 
local/transient shuffle write issues (for example).

For the options listed:
Agree, option 1 does not solve the issue.
I am also not inclined towards option 2 due to the potential perf impact - 
though I would expect this to be a rare scenario.
Option 3 looks like a very involved approach, and I am not sure if we can cover 
all the corner cases.

I am wondering if we can modify option 4 such that it helps.
There are multiple approaches perhaps - one strawman proposal:
a) Add a bit to BlockStatus indicating whether block can be used or not. And 
currently this bit gets flipped when the task which computed it successfully 
completes.
b) Maintain a taskId -> BlockStatus* mapping - which is cleaned up whenever a 
task completes (if successful, then flip bit - else remove its blocks - and 
replicas (if any)).
c) Propagate taskId in reportBlockStatus from doPutIterator, etc - where new 
block is getting created in the system.

Thoughts ?

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task 

[jira] [Commented] (SPARK-41497) Accumulator undercounting in the case of retry task with rdd cache

2022-12-12 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17646477#comment-17646477
 ] 

Mridul Muralidharan commented on SPARK-41497:
-

Agree - there appears to be a bunch of scenarios where this can be triggered.
Essentially, whenever block save succeeds and task itself fails, we can end up 
with this scenario.
As detailed, this could be storage level with replication > 1, block 
decomissioning, etc - where executor or driver has replicated the data.
I think this can also happen when task itself fails - but after persisting the 
data (even without replication or decomm) - for example, due to some 
local/transient shuffle write issues (for example).

For the options listed:
Agree, option 1 does not solve the issue.
I am also not inclined towards option 2 due to the potential perf impact - 
though I would expect this to be a rare scenario.
Option 3 looks like a very involved approach, and I am not sure if we can cover 
all the corner cases.

I am wondering if we can modify option 4 such that it helps.
There are multiple approaches perhaps - one strawman proposal:
a) Add a bit to BlockStatus indicating whether block can be used or not. And 
currently this bit gets flipped when the task which computed it successfully 
completes.
b) Maintain a taskId -> BlockStatus* mapping - which is cleaned up whenever a 
task completes (if successful, then flip bit - else remove its blocks - and 
replicas (if any)).
c) Propagate taskId in reportBlockStatus from doPutIterator, etc - where new 
block is getting created in the system.

Thoughts ?

> Accumulator undercounting in the case of retry task with rdd cache
> --
>
> Key: SPARK-41497
> URL: https://issues.apache.org/jira/browse/SPARK-41497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Priority: Major
>
> Accumulator could be undercounted when the retried task has rdd cache.  See 
> the example below and you could also find the completed and reproducible 
> example at 
> [https://github.com/apache/spark/compare/master...Ngone51:spark:fix-acc]
>   
> {code:scala}
> test("SPARK-XXX") {
>   // Set up a cluster with 2 executors
>   val conf = new SparkConf()
> .setMaster("local-cluster[2, 1, 
> 1024]").setAppName("TaskSchedulerImplSuite")
>   sc = new SparkContext(conf)
>   // Set up a custom task scheduler. The scheduler will fail the first task 
> attempt of the job
>   // submitted below. In particular, the failed first attempt task would 
> success on computation
>   // (accumulator accounting, result caching) but only fail to report its 
> success status due
>   // to the concurrent executor lost. The second task attempt would success.
>   taskScheduler = setupSchedulerWithCustomStatusUpdate(sc)
>   val myAcc = sc.longAccumulator("myAcc")
>   // Initiate a rdd with only one partition so there's only one task and 
> specify the storage level
>   // with MEMORY_ONLY_2 so that the rdd result will be cached on both two 
> executors.
>   val rdd = sc.parallelize(0 until 10, 1).mapPartitions { iter =>
> myAcc.add(100)
> iter.map(x => x + 1)
>   }.persist(StorageLevel.MEMORY_ONLY_2)
>   // This will pass since the second task attempt will succeed
>   assert(rdd.count() === 10)
>   // This will fail due to `myAcc.add(100)` won't be executed during the 
> second task attempt's
>   // execution. Because the second task attempt will load the rdd cache 
> directly instead of
>   // executing the task function so `myAcc.add(100)` is skipped.
>   assert(myAcc.value === 100)
> } {code}
>  
> We could also hit this issue with decommission even if the rdd only has one 
> copy. For example, decommission could migrate the rdd cache block to another 
> executor (the result is actually the same with 2 copies) and the 
> decommissioned executor lost before the task reports its success status to 
> the driver. 
>  
> And the issue is a bit more complicated than expected to fix. I have tried to 
> give some fixes but all of them are not ideal:
> Option 1: Clean up any rdd cache related to the failed task: in practice, 
> this option can already fix the issue in most cases. However, theoretically, 
> rdd cache could be reported to the driver right after the driver cleans up 
> the failed task's caches due to asynchronous communication. So this option 
> can’t resolve the issue thoroughly;
> Option 2: Disallow rdd cache reuse across the task attempts for the same 
> task: this option can 100% fix the issue. The problem is this way can also 
> affect the case where rdd cache can be reused across the attempts (e.g., when 
> there is no accumulator operation in the task), which can have perf 
> regression;
> Option 3: Introduce 

[jira] [Resolved] (SPARK-41360) Avoid BlockManager re-registration if the executor has been lost

2022-12-12 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-41360.
-
Fix Version/s: 3.2.4
   3.3.2
   3.4.0
   Resolution: Fixed

> Avoid BlockManager re-registration if the executor has been lost
> 
>
> Key: SPARK-41360
> URL: https://issues.apache.org/jira/browse/SPARK-41360
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
> Fix For: 3.2.4, 3.3.2, 3.4.0
>
>
> We should avoid block manager re-registration if the executor has been lost 
> as it's meaningless and harmful, e.g., SPARK-35011



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41360) Avoid BlockManager re-registration if the executor has been lost

2022-12-12 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-41360:
---

Assignee: wuyi

> Avoid BlockManager re-registration if the executor has been lost
> 
>
> Key: SPARK-41360
> URL: https://issues.apache.org/jira/browse/SPARK-41360
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.8, 3.0.3, 3.1.3, 3.2.2, 3.3.1
>Reporter: wuyi
>Assignee: wuyi
>Priority: Major
>
> We should avoid block manager re-registration if the executor has been lost 
> as it's meaningless and harmful, e.g., SPARK-35011



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41187) [Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-11 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-41187.
-
Fix Version/s: 3.3.2
   3.4.0
   Resolution: Fixed

> [Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
> 
>
> Key: SPARK-41187
> URL: https://issues.apache.org/jira/browse/SPARK-41187
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.2, 3.2.0, 3.1.3, 3.3.1
>Reporter: wineternity
>Assignee: wineternity
>Priority: Major
> Fix For: 3.3.2, 3.4.0
>
> Attachments: image-2022-11-18-10-57-49-230.png, 
> image-2022-11-18-11-01-57-435.png, image-2022-11-18-11-09-34-760.png, 
> image-20221113232214179.png, image-20221113232233952.png
>
>
> We have a long running thriftserver, which we found memory leak happened. One 
> of the memory leak is like below.  !image-2022-11-18-10-57-49-230.png!
> The event queue size in our prod env is set to very large to avoid message 
> drop, but we still find the message drop in log. And the event processing 
> time is very long , event is accumulated in queue.
> In heap dump we found LiveExecutor  instances number is also become very 
> huge. After check the heap dump, Finally we found the reason. 
> !image-2022-11-18-11-01-57-435.png!
> The reason is:
> For a shuffle map stage tasks, if a executor lost happen,  the finished task 
> will be resubmitted, and send out a taskEnd Message with reason "Resubmitted" 
> in TaskSetManager.scala, this will cause the  activeTask in 
> AppStatusListner's liveStage become negative
> {code:java}
> override def executorLost(execId: String, host: String, reason: 
> ExecutorLossReason): Unit = {
>   // Re-enqueue any tasks that ran on the failed executor if this is a 
> shuffle map stage,
>   // and we are not using an external shuffle server which could serve the 
> shuffle outputs.
>   // The reason is the next stage wouldn't be able to fetch the data from 
> this dead executor
>   // so we would need to rerun these tasks on other executors.
>   if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && 
> !isZombie) {
> for ((tid, info) <- taskInfos if info.executorId == execId) {
>   val index = info.index
>   // We may have a running task whose partition has been marked as 
> successful,
>   // this partition has another task completed in another stage attempt.
>   // We treat it as a running task and will call handleFailedTask later.
>   if (successful(index) && !info.running && 
> !killedByOtherAttempt.contains(tid)) {
> successful(index) = false
> copiesRunning(index) -= 1
> tasksSuccessful -= 1
> addPendingTask(index)
> // Tell the DAGScheduler that this task was resubmitted so that it 
> doesn't think our
> // stage finishes when a total of tasks.size tasks finish.
> sched.dagScheduler.taskEnded(
>   tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
>   }
> }
>   }{code}
>  
> !image-2022-11-18-11-09-34-760.png!
> if liveStage activeTask is negative, it will never be removed, thus cause the 
> executor moved to deadExecutors will never to removed, cause it need to check 
> there is no stage submission less than its remove time before removed. 
> {code:java}
> /** Was the specified executor active for any currently live stages? */
> private def isExecutorActiveForLiveStages(exec: LiveExecutor): Boolean = {
>   liveStages.values.asScala.exists { stage =>
> stage.info.submissionTime.getOrElse(0L) < exec.removeTime.getTime
>   }
> } 
> override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
>   .
>   // remove any dead executors that were not running for any currently active 
> stages
>   deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec))
> }{code}
>  
> Add the corresponding logs in prod env as attachment. The resubmitted task 
> number is equals to the activeTasks in heap dump for that stage.
> !image-20221113232214179.png!
> !image-20221113232233952.png!
> Hope I describe it clear, I will create a pull request later,  we just ignore 
> the resubmitted message in AppStatusListener to fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-41187) [Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

2022-12-11 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-41187:
---

Assignee: wineternity

> [Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
> 
>
> Key: SPARK-41187
> URL: https://issues.apache.org/jira/browse/SPARK-41187
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.2, 3.2.0, 3.1.3, 3.3.1
>Reporter: wineternity
>Assignee: wineternity
>Priority: Major
> Attachments: image-2022-11-18-10-57-49-230.png, 
> image-2022-11-18-11-01-57-435.png, image-2022-11-18-11-09-34-760.png, 
> image-20221113232214179.png, image-20221113232233952.png
>
>
> We have a long running thriftserver, which we found memory leak happened. One 
> of the memory leak is like below.  !image-2022-11-18-10-57-49-230.png!
> The event queue size in our prod env is set to very large to avoid message 
> drop, but we still find the message drop in log. And the event processing 
> time is very long , event is accumulated in queue.
> In heap dump we found LiveExecutor  instances number is also become very 
> huge. After check the heap dump, Finally we found the reason. 
> !image-2022-11-18-11-01-57-435.png!
> The reason is:
> For a shuffle map stage tasks, if a executor lost happen,  the finished task 
> will be resubmitted, and send out a taskEnd Message with reason "Resubmitted" 
> in TaskSetManager.scala, this will cause the  activeTask in 
> AppStatusListner's liveStage become negative
> {code:java}
> override def executorLost(execId: String, host: String, reason: 
> ExecutorLossReason): Unit = {
>   // Re-enqueue any tasks that ran on the failed executor if this is a 
> shuffle map stage,
>   // and we are not using an external shuffle server which could serve the 
> shuffle outputs.
>   // The reason is the next stage wouldn't be able to fetch the data from 
> this dead executor
>   // so we would need to rerun these tasks on other executors.
>   if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && 
> !isZombie) {
> for ((tid, info) <- taskInfos if info.executorId == execId) {
>   val index = info.index
>   // We may have a running task whose partition has been marked as 
> successful,
>   // this partition has another task completed in another stage attempt.
>   // We treat it as a running task and will call handleFailedTask later.
>   if (successful(index) && !info.running && 
> !killedByOtherAttempt.contains(tid)) {
> successful(index) = false
> copiesRunning(index) -= 1
> tasksSuccessful -= 1
> addPendingTask(index)
> // Tell the DAGScheduler that this task was resubmitted so that it 
> doesn't think our
> // stage finishes when a total of tasks.size tasks finish.
> sched.dagScheduler.taskEnded(
>   tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
>   }
> }
>   }{code}
>  
> !image-2022-11-18-11-09-34-760.png!
> if liveStage activeTask is negative, it will never be removed, thus cause the 
> executor moved to deadExecutors will never to removed, cause it need to check 
> there is no stage submission less than its remove time before removed. 
> {code:java}
> /** Was the specified executor active for any currently live stages? */
> private def isExecutorActiveForLiveStages(exec: LiveExecutor): Boolean = {
>   liveStages.values.asScala.exists { stage =>
> stage.info.submissionTime.getOrElse(0L) < exec.removeTime.getTime
>   }
> } 
> override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
>   .
>   // remove any dead executors that were not running for any currently active 
> stages
>   deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec))
> }{code}
>  
> Add the corresponding logs in prod env as attachment. The resubmitted task 
> number is equals to the activeTasks in heap dump for that stage.
> !image-20221113232214179.png!
> !image-20221113232233952.png!
> Hope I describe it clear, I will create a pull request later,  we just ignore 
> the resubmitted message in AppStatusListener to fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40987) Avoid creating a directory when deleting a block, causing DAGScheduler to not work

2022-11-29 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40987:
---

Assignee: dzcxzl

> Avoid creating a directory when deleting a block, causing DAGScheduler to not 
> work
> --
>
> Key: SPARK-40987
> URL: https://issues.apache.org/jira/browse/SPARK-40987
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.2, 3.3.1
>Reporter: dzcxzl
>Assignee: dzcxzl
>Priority: Minor
>
> When the driver submits a job, DAGScheduler calls 
> sc.broadcast(taskBinaryBytes).
> TorrentBroadcast#writeBlocks may fail due to disk problems during 
> blockManager#putBytes.
> BlockManager#doPut calls BlockManager#removeBlockInternal to clean up the 
> block.
> BlockManager#removeBlockInternal calls DiskStore#remove to clean up blocks on 
> disk.
> DiskStore#remove will try to create the directory because the directory does 
> not exist, and an exception will be thrown at this time.
> BlockInfoManager#blockInfoWrappers block info and lock not removed.
> The catch block in TorrentBroadcast#writeBlocks will call 
> blockManager.removeBroadcast to clean up the broadcast.
> Because the block lock in BlockInfoManager#blockInfoWrappers is not released, 
> the dag-scheduler-event-loop thread of DAGScheduler will wait forever.
>  
>  
> {code:java}
> 22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed 
> due to exception java.io.IOException: X.
> 22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, 
> remove all pieces of the broadcast {code}
>  
>  
>  
> {code:java}
> "dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 
> tid=0x7fc98e3fa800 nid=0x7203 waiting on condition [0x78c1e000]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x0007add3d8c8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>     at 
> org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
>     at 
> org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
>     at 
> org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown
>  Source)
>     at 
> org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
>     at 
> org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
>     at 
> org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
>     at 
> org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
>     at 
> org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
>     at 
> org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
>     at 
> org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown 
> Source)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at 
> org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
>     at 
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
>     at 
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:99)
>     at 
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
>     at 
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
>     at 
> org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
>     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
>     at 
> org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
>     at 
> org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
>     at 
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) {code}
>  
>  



--
This message was sent by Atlassian Jira

[jira] [Resolved] (SPARK-40987) Avoid creating a directory when deleting a block, causing DAGScheduler to not work

2022-11-29 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40987.
-
Fix Version/s: 3.3.2
   3.2.3
   3.4.0
   Resolution: Fixed

Issue resolved by pull request 38467
[https://github.com/apache/spark/pull/38467]

> Avoid creating a directory when deleting a block, causing DAGScheduler to not 
> work
> --
>
> Key: SPARK-40987
> URL: https://issues.apache.org/jira/browse/SPARK-40987
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.2, 3.3.1
>Reporter: dzcxzl
>Assignee: dzcxzl
>Priority: Minor
> Fix For: 3.3.2, 3.2.3, 3.4.0
>
>
> When the driver submits a job, DAGScheduler calls 
> sc.broadcast(taskBinaryBytes).
> TorrentBroadcast#writeBlocks may fail due to disk problems during 
> blockManager#putBytes.
> BlockManager#doPut calls BlockManager#removeBlockInternal to clean up the 
> block.
> BlockManager#removeBlockInternal calls DiskStore#remove to clean up blocks on 
> disk.
> DiskStore#remove will try to create the directory because the directory does 
> not exist, and an exception will be thrown at this time.
> BlockInfoManager#blockInfoWrappers block info and lock not removed.
> The catch block in TorrentBroadcast#writeBlocks will call 
> blockManager.removeBroadcast to clean up the broadcast.
> Because the block lock in BlockInfoManager#blockInfoWrappers is not released, 
> the dag-scheduler-event-loop thread of DAGScheduler will wait forever.
>  
>  
> {code:java}
> 22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed 
> due to exception java.io.IOException: X.
> 22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, 
> remove all pieces of the broadcast {code}
>  
>  
>  
> {code:java}
> "dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 
> tid=0x7fc98e3fa800 nid=0x7203 waiting on condition [0x78c1e000]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x0007add3d8c8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>     at 
> org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
>     at 
> org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
>     at 
> org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown
>  Source)
>     at 
> org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
>     at 
> org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
>     at 
> org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
>     at 
> org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
>     at 
> org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
>     at 
> org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
>     at 
> org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown 
> Source)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at 
> org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
>     at 
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
>     at 
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:99)
>     at 
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
>     at 
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
>     at 
> org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
>     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
>     at 
> org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
>     at 
> org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
>     at 
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
>     at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
>     at 
> 

[jira] [Resolved] (SPARK-40872) Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

2022-11-20 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40872.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38333
[https://github.com/apache/spark/pull/38333]

> Fallback to original shuffle block when a push-merged shuffle chunk is 
> zero-size
> 
>
> Key: SPARK-40872
> URL: https://issues.apache.org/jira/browse/SPARK-40872
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.3.0, 3.2.2
>Reporter: gaoyajun02
>Assignee: gaoyajun02
>Priority: Major
> Fix For: 3.4.0
>
>
> A large number of shuffle tests in our cluster show that bad nodes with chunk 
> corruption appear have a probability of fetching zero-size shuffleChunks. In 
> this case, we can fall back to original shuffle blocks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40872) Fallback to original shuffle block when a push-merged shuffle chunk is zero-size

2022-11-20 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40872:
---

Assignee: gaoyajun02

> Fallback to original shuffle block when a push-merged shuffle chunk is 
> zero-size
> 
>
> Key: SPARK-40872
> URL: https://issues.apache.org/jira/browse/SPARK-40872
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.3.0, 3.2.2
>Reporter: gaoyajun02
>Assignee: gaoyajun02
>Priority: Major
>
> A large number of shuffle tests in our cluster show that bad nodes with chunk 
> corruption appear have a probability of fetching zero-size shuffleChunks. In 
> this case, we can fall back to original shuffle blocks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40979) Keep removed executor info in decommission state

2022-11-16 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40979.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38441
[https://github.com/apache/spark/pull/38441]

> Keep removed executor info in decommission state
> 
>
> Key: SPARK-40979
> URL: https://issues.apache.org/jira/browse/SPARK-40979
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Zhongwei Zhu
>Assignee: Zhongwei Zhu
>Priority: Major
> Fix For: 3.4.0
>
>
> Removed executor due to decommission should be kept in a separate set. To 
> avoid OOM, set size will be limited to 1K or 10K.
> FetchFailed caused by decom executor could be divided into 2 categories:
>  # When FetchFailed reached DAGScheduler, the executor is still alive or is 
> lost but the lost info hasn't reached TaskSchedulerImpl. This is already 
> handled in SPARK-40979
>  # FetchFailed is caused by decom executor loss, so the decom info is already 
> removed in TaskSchedulerImpl. If we keep such info in a short period, it is 
> good enough. Even we limit the size of removed executors to 10K, it could be 
> only at most 10MB memory usage. In real case, it's rare to have cluster size 
> of over 10K and the chance that all these executors decomed and lost at the 
> same time would be small.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40979) Keep removed executor info in decommission state

2022-11-16 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40979:
---

Assignee: Zhongwei Zhu

> Keep removed executor info in decommission state
> 
>
> Key: SPARK-40979
> URL: https://issues.apache.org/jira/browse/SPARK-40979
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Zhongwei Zhu
>Assignee: Zhongwei Zhu
>Priority: Major
>
> Removed executor due to decommission should be kept in a separate set. To 
> avoid OOM, set size will be limited to 1K or 10K.
> FetchFailed caused by decom executor could be divided into 2 categories:
>  # When FetchFailed reached DAGScheduler, the executor is still alive or is 
> lost but the lost info hasn't reached TaskSchedulerImpl. This is already 
> handled in SPARK-40979
>  # FetchFailed is caused by decom executor loss, so the decom info is already 
> removed in TaskSchedulerImpl. If we keep such info in a short period, it is 
> good enough. Even we limit the size of removed executors to 10K, it could be 
> only at most 10MB memory usage. In real case, it's rare to have cluster size 
> of over 10K and the chance that all these executors decomed and lost at the 
> same time would be small.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40622) Result of a single task in collect() must fit in 2GB

2022-11-15 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40622.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

> Result of a single task in collect() must fit in 2GB
> 
>
> Key: SPARK-40622
> URL: https://issues.apache.org/jira/browse/SPARK-40622
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 3.3.0
>Reporter: Ziqi Liu
>Assignee: Ziqi Liu
>Priority: Major
> Fix For: 3.4.0
>
>
> when collecting results, data from single partition/task is serialized 
> through byte array or ByteBuffer(which is backed by byte array as well), 
> therefore it's subject to java array max size limit(in terms of byte array, 
> it's 2GB).
>  
> Construct a single partition larger than 2GB and collect it can easily 
> reproduce the issue
> {code:java}
> // create data of size ~3GB in single partition, which exceeds the byte array 
> limit
> // random gen to make sure it's poorly compressed
> val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 100) 
> as data")
> withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") {
>   withSQLConf("spark.sql.useChunkedBuffer" -> "true") {
> df.queryExecution.executedPlan.executeCollect()
>   }
> } {code}
>  will get a OOM error from 
> [https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125]
>  
> Consider using ChunkedByteBuffer to replace byte array in order to bypassing 
> this limit



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40622) Result of a single task in collect() must fit in 2GB

2022-11-15 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40622:
---

Assignee: Ziqi Liu

> Result of a single task in collect() must fit in 2GB
> 
>
> Key: SPARK-40622
> URL: https://issues.apache.org/jira/browse/SPARK-40622
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 3.3.0
>Reporter: Ziqi Liu
>Assignee: Ziqi Liu
>Priority: Major
>
> when collecting results, data from single partition/task is serialized 
> through byte array or ByteBuffer(which is backed by byte array as well), 
> therefore it's subject to java array max size limit(in terms of byte array, 
> it's 2GB).
>  
> Construct a single partition larger than 2GB and collect it can easily 
> reproduce the issue
> {code:java}
> // create data of size ~3GB in single partition, which exceeds the byte array 
> limit
> // random gen to make sure it's poorly compressed
> val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 100) 
> as data")
> withSQLConf("spark.databricks.driver.localMaxResultSize" -> "4g") {
>   withSQLConf("spark.sql.useChunkedBuffer" -> "true") {
> df.queryExecution.executedPlan.executeCollect()
>   }
> } {code}
>  will get a OOM error from 
> [https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/master/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L125]
>  
> Consider using ChunkedByteBuffer to replace byte array in order to bypassing 
> this limit



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40901) Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path

2022-11-10 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40901.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38377
[https://github.com/apache/spark/pull/38377]

> Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path
> 
>
> Key: SPARK-40901
> URL: https://issues.apache.org/jira/browse/SPARK-40901
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.0, 3.2.2
>Reporter: Swaminathan Balachandran
>Assignee: Swaminathan Balachandran
>Priority: Major
> Fix For: 3.4.0
>
>
> Spark Config: spark.driver.log.dfsDir doesn't support absolute URI hadoop 
> based path. It currently only supports URI path and writes only to 
> fs.defaultFS and does not write logs to any other configured filesystem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40901) Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path

2022-11-10 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40901:
---

Assignee: Swaminathan Balachandran

> Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path
> 
>
> Key: SPARK-40901
> URL: https://issues.apache.org/jira/browse/SPARK-40901
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.0, 3.2.2
>Reporter: Swaminathan Balachandran
>Assignee: Swaminathan Balachandran
>Priority: Major
>
> Spark Config: spark.driver.log.dfsDir doesn't support absolute URI hadoop 
> based path. It currently only supports URI path and writes only to 
> fs.defaultFS and does not write logs to any other configured filesystem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41051) Optimize ProcfsMetrics file acquisition

2022-11-08 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-41051.
-
Fix Version/s: 3.4.0
   (was: 3.3.2)
   Resolution: Fixed

Issue resolved by pull request 38563
[https://github.com/apache/spark/pull/38563]

> Optimize ProcfsMetrics file acquisition
> ---
>
> Key: SPARK-41051
> URL: https://issues.apache.org/jira/browse/SPARK-41051
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.1
> Environment: spark-master
>Reporter: sur
>Priority: Minor
> Fix For: 3.4.0
>
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> When obtaining the Procfs file, variables are created but not used, and there 
> are duplicate codes. We should reduce such situations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40950) isRemoteAddressMaxedOut performance overhead on scala 2.13

2022-11-04 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40950:
---

Assignee: Emil Ejbyfeldt

> isRemoteAddressMaxedOut performance overhead on scala 2.13
> --
>
> Key: SPARK-40950
> URL: https://issues.apache.org/jira/browse/SPARK-40950
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.2, 3.3.1
>Reporter: Emil Ejbyfeldt
>Assignee: Emil Ejbyfeldt
>Priority: Major
>
> On scala 2.13 the blocks in FetchRequest is sometimes backed by a `List` 
> while in 2.12 it would be ArrayBuffer. This means that calculating the length 
> of the blocks which is done in isRemoteAddressMaxedOut and other places now 
> much more expensive.  This is because in 2.13 `Seq` is can no longer be 
> backed by a mutable collection.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40950) isRemoteAddressMaxedOut performance overhead on scala 2.13

2022-11-04 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40950.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38427
[https://github.com/apache/spark/pull/38427]

> isRemoteAddressMaxedOut performance overhead on scala 2.13
> --
>
> Key: SPARK-40950
> URL: https://issues.apache.org/jira/browse/SPARK-40950
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.2, 3.3.1
>Reporter: Emil Ejbyfeldt
>Assignee: Emil Ejbyfeldt
>Priority: Major
> Fix For: 3.4.0
>
>
> On scala 2.13 the blocks in FetchRequest is sometimes backed by a `List` 
> while in 2.12 it would be ArrayBuffer. This means that calculating the length 
> of the blocks which is done in isRemoteAddressMaxedOut and other places now 
> much more expensive.  This is because in 2.13 `Seq` is can no longer be 
> backed by a mutable collection.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40968) Fix some wrong/misleading comments in DAGSchedulerSuite

2022-11-02 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40968:
---

Assignee: Jiexing Li

> Fix some wrong/misleading comments in DAGSchedulerSuite
> ---
>
> Key: SPARK-40968
> URL: https://issues.apache.org/jira/browse/SPARK-40968
> Project: Spark
>  Issue Type: Test
>  Components: Scheduler
>Affects Versions: 3.4.0
>Reporter: Jiexing Li
>Assignee: Jiexing Li
>Priority: Minor
>
> A few comments in 
> test("SPARK-25341: continuous indeterminate stage roll back")
> test("SPARK-23207: cannot rollback a result stage")
> are wrong or misleading. need to fix them for better readability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40968) Fix some wrong/misleading comments in DAGSchedulerSuite

2022-11-02 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40968.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 38371
[https://github.com/apache/spark/pull/38371]

> Fix some wrong/misleading comments in DAGSchedulerSuite
> ---
>
> Key: SPARK-40968
> URL: https://issues.apache.org/jira/browse/SPARK-40968
> Project: Spark
>  Issue Type: Test
>  Components: Scheduler
>Affects Versions: 3.4.0
>Reporter: Jiexing Li
>Assignee: Jiexing Li
>Priority: Minor
> Fix For: 3.4.0
>
>
> A few comments in 
> test("SPARK-25341: continuous indeterminate stage roll back")
> test("SPARK-23207: cannot rollback a result stage")
> are wrong or misleading. need to fix them for better readability.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40320) When the Executor plugin fails to initialize, the Executor shows active but does not accept tasks forever, just like being hung

2022-10-26 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40320:
---

Assignee: miracle

> When the Executor plugin fails to initialize, the Executor shows active but 
> does not accept tasks forever, just like being hung
> ---
>
> Key: SPARK-40320
> URL: https://issues.apache.org/jira/browse/SPARK-40320
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 3.0.0
>Reporter: Mars
>Assignee: miracle
>Priority: Major
> Fix For: 3.4.0
>
>
> *Reproduce step:*
> set `spark.plugins=ErrorSparkPlugin`
> `ErrorSparkPlugin` && `ErrorExecutorPlugin` class as below (I abbreviate the 
> code to make it clearer):
> {code:java}
> class ErrorSparkPlugin extends SparkPlugin {
>   /**
>*/
>   override def driverPlugin(): DriverPlugin =  new ErrorDriverPlugin()
>   /**
>*/
>   override def executorPlugin(): ExecutorPlugin = new ErrorExecutorPlugin()
> }{code}
> {code:java}
> class ErrorExecutorPlugin extends ExecutorPlugin {
>   private val checkingInterval: Long = 1
>   override def init(_ctx: PluginContext, extraConf: util.Map[String, 
> String]): Unit = {
> if (checkingInterval == 1) {
>   throw new UnsatisfiedLinkError("My Exception error")
> }
>   }
> } {code}
> The Executor is active when we check in spark-ui, however it was broken and 
> doesn't receive any task.
> *Root Cause:*
> I check the code and I find in `org.apache.spark.rpc.netty.Inbox#safelyCall` 
> it will throw fatal error (`UnsatisfiedLinkError` is fatal erro ) in method 
> `dealWithFatalError` . Actually the  `CoarseGrainedExecutorBackend` JVM 
> process  is active but the  communication thread is no longer working ( 
> please see  `MessageLoop#receiveLoopRunnable` , `receiveLoop()` was broken, 
> so executor doesn't receive any message)
> Some ideas:
> I think it is very hard to know what happened here unless we check in the 
> code. The Executor is active but it can't do anything. We will wonder if the 
> driver is broken or the Executor problem.  I think at least the Executor 
> status shouldn't be active here or the Executor can exitExecutor (kill itself)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40320) When the Executor plugin fails to initialize, the Executor shows active but does not accept tasks forever, just like being hung

2022-10-26 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-40320:

Fix Version/s: 3.4.0

> When the Executor plugin fails to initialize, the Executor shows active but 
> does not accept tasks forever, just like being hung
> ---
>
> Key: SPARK-40320
> URL: https://issues.apache.org/jira/browse/SPARK-40320
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 3.0.0
>Reporter: Mars
>Priority: Major
> Fix For: 3.4.0
>
>
> *Reproduce step:*
> set `spark.plugins=ErrorSparkPlugin`
> `ErrorSparkPlugin` && `ErrorExecutorPlugin` class as below (I abbreviate the 
> code to make it clearer):
> {code:java}
> class ErrorSparkPlugin extends SparkPlugin {
>   /**
>*/
>   override def driverPlugin(): DriverPlugin =  new ErrorDriverPlugin()
>   /**
>*/
>   override def executorPlugin(): ExecutorPlugin = new ErrorExecutorPlugin()
> }{code}
> {code:java}
> class ErrorExecutorPlugin extends ExecutorPlugin {
>   private val checkingInterval: Long = 1
>   override def init(_ctx: PluginContext, extraConf: util.Map[String, 
> String]): Unit = {
> if (checkingInterval == 1) {
>   throw new UnsatisfiedLinkError("My Exception error")
> }
>   }
> } {code}
> The Executor is active when we check in spark-ui, however it was broken and 
> doesn't receive any task.
> *Root Cause:*
> I check the code and I find in `org.apache.spark.rpc.netty.Inbox#safelyCall` 
> it will throw fatal error (`UnsatisfiedLinkError` is fatal erro ) in method 
> `dealWithFatalError` . Actually the  `CoarseGrainedExecutorBackend` JVM 
> process  is active but the  communication thread is no longer working ( 
> please see  `MessageLoop#receiveLoopRunnable` , `receiveLoop()` was broken, 
> so executor doesn't receive any message)
> Some ideas:
> I think it is very hard to know what happened here unless we check in the 
> code. The Executor is active but it can't do anything. We will wonder if the 
> driver is broken or the Executor problem.  I think at least the Executor 
> status shouldn't be active here or the Executor can exitExecutor (kill itself)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40320) When the Executor plugin fails to initialize, the Executor shows active but does not accept tasks forever, just like being hung

2022-10-26 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40320.
-
Resolution: Fixed

> When the Executor plugin fails to initialize, the Executor shows active but 
> does not accept tasks forever, just like being hung
> ---
>
> Key: SPARK-40320
> URL: https://issues.apache.org/jira/browse/SPARK-40320
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 3.0.0
>Reporter: Mars
>Priority: Major
> Fix For: 3.4.0
>
>
> *Reproduce step:*
> set `spark.plugins=ErrorSparkPlugin`
> `ErrorSparkPlugin` && `ErrorExecutorPlugin` class as below (I abbreviate the 
> code to make it clearer):
> {code:java}
> class ErrorSparkPlugin extends SparkPlugin {
>   /**
>*/
>   override def driverPlugin(): DriverPlugin =  new ErrorDriverPlugin()
>   /**
>*/
>   override def executorPlugin(): ExecutorPlugin = new ErrorExecutorPlugin()
> }{code}
> {code:java}
> class ErrorExecutorPlugin extends ExecutorPlugin {
>   private val checkingInterval: Long = 1
>   override def init(_ctx: PluginContext, extraConf: util.Map[String, 
> String]): Unit = {
> if (checkingInterval == 1) {
>   throw new UnsatisfiedLinkError("My Exception error")
> }
>   }
> } {code}
> The Executor is active when we check in spark-ui, however it was broken and 
> doesn't receive any task.
> *Root Cause:*
> I check the code and I find in `org.apache.spark.rpc.netty.Inbox#safelyCall` 
> it will throw fatal error (`UnsatisfiedLinkError` is fatal erro ) in method 
> `dealWithFatalError` . Actually the  `CoarseGrainedExecutorBackend` JVM 
> process  is active but the  communication thread is no longer working ( 
> please see  `MessageLoop#receiveLoopRunnable` , `receiveLoop()` was broken, 
> so executor doesn't receive any message)
> Some ideas:
> I think it is very hard to know what happened here unless we check in the 
> code. The Executor is active but it can't do anything. We will wonder if the 
> driver is broken or the Executor problem.  I think at least the Executor 
> status shouldn't be active here or the Executor can exitExecutor (kill itself)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40902) Quick submission of drivers in tests to mesos scheduler results in dropping drivers

2022-10-24 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-40902:

Priority: Minor  (was: Major)

> Quick submission of drivers in tests to mesos scheduler results in dropping 
> drivers
> ---
>
> Key: SPARK-40902
> URL: https://issues.apache.org/jira/browse/SPARK-40902
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.4.8, 3.0.3, 3.4.0
>Reporter: Mridul Muralidharan
>Priority: Minor
>
> Queued drivers in MesosClusterScheduler are ordered based on 
> MesosDriverDescription - and the default ordering checks for priority, 
> followed by submission time. For two driver submissions with same priority 
> and if made in quick succession (such that submission time is same due to 
> millisecond granularity of Date), this results in dropping the second 
> MesosDriverDescription from the queuedDrivers - as driverOrdering returns 0 
> when comparing the descriptions. This jira fixes the more immediate issue 
> with tests, but we do need to relook at this for mess scheduler in general 
> later.
> Currently, this affects tests - for example, in the latest VOTE for 3.3.1 [1] 
> - and is not consistently reproducible unless on a fast machine.
> [1] https://lists.apache.org/thread/jof098qxp0s6qqmt9qwv52f9665b1pjg



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40902) Quick submission of drivers in tests to mesos scheduler results in dropping drivers

2022-10-24 Thread Mridul Muralidharan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623270#comment-17623270
 ] 

Mridul Muralidharan commented on SPARK-40902:
-

+CC [~dongjoon], [~yumwang]

> Quick submission of drivers in tests to mesos scheduler results in dropping 
> drivers
> ---
>
> Key: SPARK-40902
> URL: https://issues.apache.org/jira/browse/SPARK-40902
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 2.4.8, 3.0.3, 3.4.0
>Reporter: Mridul Muralidharan
>Priority: Major
>
> Queued drivers in MesosClusterScheduler are ordered based on 
> MesosDriverDescription - and the default ordering checks for priority, 
> followed by submission time. For two driver submissions with same priority 
> and if made in quick succession (such that submission time is same due to 
> millisecond granularity of Date), this results in dropping the second 
> MesosDriverDescription from the queuedDrivers - as driverOrdering returns 0 
> when comparing the descriptions. This jira fixes the more immediate issue 
> with tests, but we do need to relook at this for mess scheduler in general 
> later.
> Currently, this affects tests - for example, in the latest VOTE for 3.3.1 [1] 
> - and is not consistently reproducible unless on a fast machine.
> [1] https://lists.apache.org/thread/jof098qxp0s6qqmt9qwv52f9665b1pjg



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40902) Quick submission of drivers in tests to mesos scheduler results in dropping drivers

2022-10-24 Thread Mridul Muralidharan (Jira)
Mridul Muralidharan created SPARK-40902:
---

 Summary: Quick submission of drivers in tests to mesos scheduler 
results in dropping drivers
 Key: SPARK-40902
 URL: https://issues.apache.org/jira/browse/SPARK-40902
 Project: Spark
  Issue Type: Bug
  Components: Mesos
Affects Versions: 3.0.3, 2.4.8, 3.4.0
Reporter: Mridul Muralidharan


Queued drivers in MesosClusterScheduler are ordered based on 
MesosDriverDescription - and the default ordering checks for priority, followed 
by submission time. For two driver submissions with same priority and if made 
in quick succession (such that submission time is same due to millisecond 
granularity of Date), this results in dropping the second 
MesosDriverDescription from the queuedDrivers - as driverOrdering returns 0 
when comparing the descriptions. This jira fixes the more immediate issue with 
tests, but we do need to relook at this for mess scheduler in general later.

Currently, this affects tests - for example, in the latest VOTE for 3.3.1 [1] - 
and is not consistently reproducible unless on a fast machine.



[1] https://lists.apache.org/thread/jof098qxp0s6qqmt9qwv52f9665b1pjg



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40096) Finalize shuffle merge slow due to connection creation fails

2022-09-22 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40096:
---

Assignee: Wan Kun

> Finalize shuffle merge slow due to connection creation fails
> 
>
> Key: SPARK-40096
> URL: https://issues.apache.org/jira/browse/SPARK-40096
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Wan Kun
>Assignee: Wan Kun
>Priority: Major
>
> *How to reproduce this issue*
>  * Enable push based shuffle
>  * Remove some merger nodes before sending finalize RPCs
>  * Driver try to connect those merger shuffle services and send finalize RPC 
> one by one, each connection creation will timeout after 
> SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY (120s by default)
>  
> We can send these RPCs in *shuffleMergeFinalizeScheduler*  thread pool and 
> handle the connection creation exception



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40096) Finalize shuffle merge slow due to connection creation fails

2022-09-22 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40096.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 37533
[https://github.com/apache/spark/pull/37533]

> Finalize shuffle merge slow due to connection creation fails
> 
>
> Key: SPARK-40096
> URL: https://issues.apache.org/jira/browse/SPARK-40096
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Wan Kun
>Assignee: Wan Kun
>Priority: Major
> Fix For: 3.4.0
>
>
> *How to reproduce this issue*
>  * Enable push based shuffle
>  * Remove some merger nodes before sending finalize RPCs
>  * Driver try to connect those merger shuffle services and send finalize RPC 
> one by one, each connection creation will timeout after 
> SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY (120s by default)
>  
> We can send these RPCs in *shuffleMergeFinalizeScheduler*  thread pool and 
> handle the connection creation exception



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-38888) Add `RocksDBProvider` similar to `LevelDBProvider`

2022-09-08 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-3.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 37610
[https://github.com/apache/spark/pull/37610]

> Add `RocksDBProvider` similar to `LevelDBProvider`
> --
>
> Key: SPARK-3
> URL: https://issues.apache.org/jira/browse/SPARK-3
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, YARN
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Minor
> Fix For: 3.4.0
>
>
> `LevelDBProvider` is used by `ExternalShuffleBlockResolver` and 
> `YarnShuffleService`, a corresponding `RocksDB` implementation should be added



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-38888) Add `RocksDBProvider` similar to `LevelDBProvider`

2022-09-08 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-3:
---

Assignee: Yang Jie

> Add `RocksDBProvider` similar to `LevelDBProvider`
> --
>
> Key: SPARK-3
> URL: https://issues.apache.org/jira/browse/SPARK-3
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, YARN
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Minor
>
> `LevelDBProvider` is used by `ExternalShuffleBlockResolver` and 
> `YarnShuffleService`, a corresponding `RocksDB` implementation should be added



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40186) mergedShuffleCleaner should have been shutdown before db closed

2022-09-07 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40186.
-
Target Version/s: 3.4.0
Assignee: Yang Jie
  Resolution: Fixed

> mergedShuffleCleaner should have been shutdown before db closed
> ---
>
> Key: SPARK-40186
> URL: https://issues.apache.org/jira/browse/SPARK-40186
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Major
>
> Should ensure `RemoteBlockPushResolver#mergedShuffleCleaner` have been 
> shutdown before `RemoteBlockPushResolver#db` closed, otherwise, 
> `RemoteBlockPushResolver#applicationRemoved` may perform delete operations on 
> a closed db.
>  
> https://github.com/apache/spark/pull/37610#discussion_r951185256
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-40094) Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException to release executors for dynamic allocation

2022-08-24 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-40094.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 37528
[https://github.com/apache/spark/pull/37528]

>  Send TaskEnd event when task failed with NotSerializableException or 
> TaskOutputFileAlreadyExistException to release executors for dynamic 
> allocation 
> --
>
> Key: SPARK-40094
> URL: https://issues.apache.org/jira/browse/SPARK-40094
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: wangshengjie
>Assignee: wangshengjie
>Priority: Major
> Fix For: 3.4.0
>
>
> We found if task failed with NotSerializableException or 
> TaskOutputFileAlreadyExistException, wont send TaskEnd event, and this will 
> cause dynamic allocation not release executor normally.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-40094) Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException to release executors for dynamic allocation

2022-08-24 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-40094:
---

Assignee: wangshengjie

>  Send TaskEnd event when task failed with NotSerializableException or 
> TaskOutputFileAlreadyExistException to release executors for dynamic 
> allocation 
> --
>
> Key: SPARK-40094
> URL: https://issues.apache.org/jira/browse/SPARK-40094
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: wangshengjie
>Assignee: wangshengjie
>Priority: Major
>
> We found if task failed with NotSerializableException or 
> TaskOutputFileAlreadyExistException, wont send TaskEnd event, and this will 
> cause dynamic allocation not release executor normally.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-38909) Encapsulate LevelDB used by ExternalShuffleBlockResolver and YarnShuffleService as LocalDB

2022-08-18 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-38909:
---

Assignee: Yang Jie

> Encapsulate LevelDB used by ExternalShuffleBlockResolver and 
> YarnShuffleService as LocalDB
> --
>
> Key: SPARK-38909
> URL: https://issues.apache.org/jira/browse/SPARK-38909
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, YARN
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Minor
>
> {{ExternalShuffleBlockResolver}} and {{YarnShuffleService}}  use {{{}LevelDB 
> directly{}}}, this is not conducive to extending the use of {{RocksDB}} in 
> this scenario. This pr is encapsulated for expansibility. It will be the 
> pre-work of SPARK-3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-38909) Encapsulate LevelDB used by ExternalShuffleBlockResolver and YarnShuffleService as LocalDB

2022-08-18 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-38909.
-
Fix Version/s: 3.4.0
   Resolution: Fixed

Issue resolved by pull request 36200
[https://github.com/apache/spark/pull/36200]

> Encapsulate LevelDB used by ExternalShuffleBlockResolver and 
> YarnShuffleService as LocalDB
> --
>
> Key: SPARK-38909
> URL: https://issues.apache.org/jira/browse/SPARK-38909
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, YARN
>Affects Versions: 3.4.0
>Reporter: Yang Jie
>Assignee: Yang Jie
>Priority: Minor
> Fix For: 3.4.0
>
>
> {{ExternalShuffleBlockResolver}} and {{YarnShuffleService}}  use {{{}LevelDB 
> directly{}}}, this is not conducive to extending the use of {{RocksDB}} in 
> this scenario. This pr is encapsulated for expansibility. It will be the 
> pre-work of SPARK-3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-39955) Improve LaunchTask process to avoid Stage failures caused by fail-to-send LaunchTask messages

2022-08-11 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-39955:
---

Assignee: Kai-Hsun Chen  (was: Kai-Hsun Chen)

> Improve LaunchTask process to avoid Stage failures caused by fail-to-send 
> LaunchTask messages
> -
>
> Key: SPARK-39955
> URL: https://issues.apache.org/jira/browse/SPARK-39955
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Kai-Hsun Chen
>Assignee: Kai-Hsun Chen
>Priority: Major
> Fix For: 3.4.0
>
>
> There are two possible reasons, including Network Failure and Task Failure, 
> to make RPC failures.
> (1) Task Failure: The network is good, but the task causes the executor's JVM 
> crash. Hence, RPC fails.
> (2) Network Failure: The executor works well, but the network between Driver 
> and Executor is broken. Hence, RPC fails.
> We should handle these two different kinds of failure in different ways. 
> First, if the failure is Task Failure, we should increment the variable 
> `{{{}numFailures`{}}}. If the value of {{`numFailures`}} is larger than a 
> threshold, Spark will label the job failed. Second, if the failure is Network 
> Failure, we will not increment the variable `{{{}numFailures`{}}}. We will 
> just assign the task to a new executor. Hence, the job will not be recognized 
> as failed due to Network Failure.
> However, currently, Spark recognizes every RPC failure as Task Failure. 
> Hence, it will cause extra Spark job failures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-39955) Improve LaunchTask process to avoid Stage failures caused by fail-to-send LaunchTask messages

2022-08-11 Thread Mridul Muralidharan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan reassigned SPARK-39955:
---

Assignee: Kai-Hsun Chen  (was: Mridul Muralidharan)

> Improve LaunchTask process to avoid Stage failures caused by fail-to-send 
> LaunchTask messages
> -
>
> Key: SPARK-39955
> URL: https://issues.apache.org/jira/browse/SPARK-39955
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Kai-Hsun Chen
>Assignee: Kai-Hsun Chen
>Priority: Major
> Fix For: 3.4.0
>
>
> There are two possible reasons, including Network Failure and Task Failure, 
> to make RPC failures.
> (1) Task Failure: The network is good, but the task causes the executor's JVM 
> crash. Hence, RPC fails.
> (2) Network Failure: The executor works well, but the network between Driver 
> and Executor is broken. Hence, RPC fails.
> We should handle these two different kinds of failure in different ways. 
> First, if the failure is Task Failure, we should increment the variable 
> `{{{}numFailures`{}}}. If the value of {{`numFailures`}} is larger than a 
> threshold, Spark will label the job failed. Second, if the failure is Network 
> Failure, we will not increment the variable `{{{}numFailures`{}}}. We will 
> just assign the task to a new executor. Hence, the job will not be recognized 
> as failed due to Network Failure.
> However, currently, Spark recognizes every RPC failure as Task Failure. 
> Hence, it will cause extra Spark job failures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   >