[jira] [Commented] (SPARK-44215) Client receives zero number of chunks in merge meta response which doesn't trigger fallback to unmerged blocks

2023-07-04 Thread Chandni Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740023#comment-17740023
 ] 

Chandni Singh commented on SPARK-44215:
---

PR to backport the change to 3.3
https://github.com/apache/spark/pull/41859

> Client receives zero number of chunks in merge meta response which doesn't 
> trigger fallback to unmerged blocks
> --
>
> Key: SPARK-44215
> URL: https://issues.apache.org/jira/browse/SPARK-44215
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>Priority: Major
> Fix For: 3.5.0, 3.4.2
>
>
> We still see instances of the server returning 0 {{numChunks}} in 
> {{mergedMetaResponse}} which causes the executor to fail with 
> {{ArithmeticException}}. 
> {code}
> java.lang.ArithmeticException: / by zero
>   at 
> org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047)
>   at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>   at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>   at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> {code}
> Here the executor doesn't fallback to fetch un-merged blocks and this also 
> doesn't result in a {{FetchFailure}}. So, the application fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44215) Client receives zero number of chunks in merge meta response which doesn't trigger fallback to unmerged blocks

2023-06-27 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-44215:
-

 Summary: Client receives zero number of chunks in merge meta 
response which doesn't trigger fallback to unmerged blocks
 Key: SPARK-44215
 URL: https://issues.apache.org/jira/browse/SPARK-44215
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.2.0
Reporter: Chandni Singh


We still see instances of the server returning 0 {{numChunks}} in 
{{mergedMetaResponse}} which causes the executor to fail with 
{{ArithmeticException}}. 
{code}
java.lang.ArithmeticException: / by zero
at 
org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90)
at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
{code}
Here the executor doesn't fallback to fetch un-merged blocks and this also 
doesn't result in a {{FetchFailure}}. So, the application fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-43179) Add option for applications to control saving of metadata in the External Shuffle Service LevelDB

2023-06-07 Thread Chandni Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730307#comment-17730307
 ] 

Chandni Singh edited comment on SPARK-43179 at 6/7/23 11:30 PM:


Found a bug so re-opening the issue. 

Introduced a bug that impacts recovery of applications 
{code:java}
ERROR org.apache.spark.network.server.TransportRequestHandler: Error while 
invoking RpcHandler#receive() on RPC id 5764589675121231159 
java.lang.RuntimeException: javax.security.sasl.SaslException: DIGEST-MD5: 
digest response format violation. Mismatched response. at 
org.sparkproject.guava.base.Throwables.propagate(Throwables.java:160) at 
org.apache.spark.network.sasl.SparkSaslServer.response(SparkSaslServer.java:121)
 at 
org.apache.spark.network.sasl.SaslRpcHandler.doAuthChallenge(SaslRpcHandler.java:94)
 at 
org.apache.spark.network.crypto.AuthRpcHandler.doAuthChallenge(AuthRpcHandler.java:81)
 at 
org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:59)
 at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
 at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
 at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
 at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
 at 
org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at 
org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at 
org.sparkproject.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at org.sparkproject.io.netty.channel.DefaultChannelPipeline {code}


was (Author: csingh):
Found a bug so re-opening the issue. 

> Add option for applications to control saving of metadata in the External 
> Shuffle Service LevelDB
> -
>
> Key: SPARK-43179
> URL: https://issues.apache.org/jira/browse/SPARK-43179
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.4.0
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>Priority: Major
> Fix For: 3.5.0
>
>
> Currently, the External Shuffle Service stores application metadata in 
> LevelDB. This is necessary to enable the shuffle server to resume serving 
> shuffle data for an application whose executors registered before the 
> NodeManager restarts. However, the metadata includes the application secret, 
> which is stored in LevelDB without encryption. This is a potential security 
> risk, particularly for applications with high security requirements. While 
> filesystem access control lists (ACLs) can help protect keys and 
> certificates, they may not be sufficient for some use cases. In response, we 
> have decided not to store metadata for these high-security applications in 
> LevelDB. As a result, these applications may experience more failures in the 
> event of a node restart, but we 

[jira] [Reopened] (SPARK-43179) Add option for applications to control saving of metadata in the External Shuffle Service LevelDB

2023-06-07 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh reopened SPARK-43179:
---

Found a bug so re-opening the issue. 

> Add option for applications to control saving of metadata in the External 
> Shuffle Service LevelDB
> -
>
> Key: SPARK-43179
> URL: https://issues.apache.org/jira/browse/SPARK-43179
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.4.0
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>Priority: Major
> Fix For: 3.5.0
>
>
> Currently, the External Shuffle Service stores application metadata in 
> LevelDB. This is necessary to enable the shuffle server to resume serving 
> shuffle data for an application whose executors registered before the 
> NodeManager restarts. However, the metadata includes the application secret, 
> which is stored in LevelDB without encryption. This is a potential security 
> risk, particularly for applications with high security requirements. While 
> filesystem access control lists (ACLs) can help protect keys and 
> certificates, they may not be sufficient for some use cases. In response, we 
> have decided not to store metadata for these high-security applications in 
> LevelDB. As a result, these applications may experience more failures in the 
> event of a node restart, but we believe this trade-off is acceptable given 
> the increased security risk.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43583) When encryption is enabled on the External Shuffle Service, then processing of push meta requests throws NPE

2023-05-18 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-43583:
-

 Summary: When encryption is enabled on the External Shuffle 
Service, then processing of push meta requests throws NPE
 Key: SPARK-43583
 URL: https://issues.apache.org/jira/browse/SPARK-43583
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 3.2.0
Reporter: Chandni Singh


After enabling support for over-the-wire encryption for spark shuffle services, 
the meta requests for push-merged blocks fail with this error:
{code:java}
java.lang.RuntimeException: java.lang.NullPointerException
at 
org.apache.spark.network.server.AbstractAuthRpcHandler.getMergedBlockMetaReqHandler(AbstractAuthRpcHandler.java:110)
at 
org.apache.spark.network.crypto.AuthRpcHandler.getMergedBlockMetaReqHandler(AuthRpcHandler.java:144)
at 
org.apache.spark.network.server.TransportRequestHandler.processMergedBlockMetaRequest(TransportRequestHandler.java:275)
at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:117)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
at 
org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at 
org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
 
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43179) Add option for applications to control saving of metadata in the External Shuffle Service LevelDB

2023-04-18 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-43179:
--
Summary: Add option for applications to control saving of metadata in the 
External Shuffle Service LevelDB  (was: Add option for applications to control 
saving of metadata in External Shuffle Service LevelDB)

> Add option for applications to control saving of metadata in the External 
> Shuffle Service LevelDB
> -
>
> Key: SPARK-43179
> URL: https://issues.apache.org/jira/browse/SPARK-43179
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.4.0
>Reporter: Chandni Singh
>Priority: Major
>
> Currently, the External Shuffle Service stores application metadata in 
> LevelDB. This is necessary to enable the shuffle server to resume serving 
> shuffle data for an application whose executors registered before the 
> NodeManager restarts. However, the metadata includes the application secret, 
> which is stored in LevelDB without encryption. This is a potential security 
> risk, particularly for applications with high security requirements. While 
> filesystem access control lists (ACLs) can help protect keys and 
> certificates, they may not be sufficient for some use cases. In response, we 
> have decided not to store metadata for these high-security applications in 
> LevelDB. As a result, these applications may experience more failures in the 
> event of a node restart, but we believe this trade-off is acceptable given 
> the increased security risk.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43179) Add option for applications to control saving of metadata in External Shuffle Service LevelDB

2023-04-18 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-43179:
--
Summary: Add option for applications to control saving of metadata in 
External Shuffle Service LevelDB  (was: Allow applications to control whether 
their metadata gets saved by the shuffle server in the db)

> Add option for applications to control saving of metadata in External Shuffle 
> Service LevelDB
> -
>
> Key: SPARK-43179
> URL: https://issues.apache.org/jira/browse/SPARK-43179
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 3.4.0
>Reporter: Chandni Singh
>Priority: Major
>
> Currently, the External Shuffle Service stores application metadata in 
> LevelDB. This is necessary to enable the shuffle server to resume serving 
> shuffle data for an application whose executors registered before the 
> NodeManager restarts. However, the metadata includes the application secret, 
> which is stored in LevelDB without encryption. This is a potential security 
> risk, particularly for applications with high security requirements. While 
> filesystem access control lists (ACLs) can help protect keys and 
> certificates, they may not be sufficient for some use cases. In response, we 
> have decided not to store metadata for these high-security applications in 
> LevelDB. As a result, these applications may experience more failures in the 
> event of a node restart, but we believe this trade-off is acceptable given 
> the increased security risk.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43179) Allow applications to control whether their metadata gets saved by the shuffle server in the db

2023-04-18 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-43179:
-

 Summary: Allow applications to control whether their metadata gets 
saved by the shuffle server in the db
 Key: SPARK-43179
 URL: https://issues.apache.org/jira/browse/SPARK-43179
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 3.4.0
Reporter: Chandni Singh


Currently, the External Shuffle Service stores application metadata in LevelDB. 
This is necessary to enable the shuffle server to resume serving shuffle data 
for an application whose executors registered before the NodeManager restarts. 
However, the metadata includes the application secret, which is stored in 
LevelDB without encryption. This is a potential security risk, particularly for 
applications with high security requirements. While filesystem access control 
lists (ACLs) can help protect keys and certificates, they may not be sufficient 
for some use cases. In response, we have decided not to store metadata for 
these high-security applications in LevelDB. As a result, these applications 
may experience more failures in the event of a node restart, but we believe 
this trade-off is acceptable given the increased security risk.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42834) Divided by zero occurs in PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse

2023-03-17 Thread Chandni Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701908#comment-17701908
 ] 

Chandni Singh commented on SPARK-42834:
---

We don't expect the `numChunks` to be zero or `bitmaps` to be empty. There was 
a bug in 3.2.0 which was fixed with 
https://issues.apache.org/jira/browse/SPARK-37675
Can you please check if you have this fix?

> Divided by zero occurs in 
> PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse
> 
>
> Key: SPARK-42834
> URL: https://issues.apache.org/jira/browse/SPARK-42834
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Li Ying
>Priority: Major
>
> {color:#22}Sometimes when run a SQL job with push based shuffle, 
> exception occurs as below.  It seems that there’s no element in the bitmaps 
> which stores merge chunk meta. {color}
> {color:#22}Is it a bug that we should not createChunkBlockInfos when 
> bitmaps is empty or the bitmaps should never be empty here ?{color}
>  
> {code:java}
> Caused by: java.lang.ArithmeticException: / by zero
> at 
> org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:117)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:980)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
>  {code}
> related code:
> {code:java}
> def createChunkBlockInfosFromMetaResponse(
> shuffleId: Int,
> shuffleMergeId: Int,
> reduceId: Int,
> blockSize: Long,
> bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
>   val approxChunkSize = blockSize / bitmaps.length
>   val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]()
>   for (i <- bitmaps.indices) {
> val blockChunkId = ShuffleBlockChunkId(shuffleId, shuffleMergeId, 
> reduceId, i)
> chunksMetaMap.put(blockChunkId, bitmaps(i))
> logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize")
> blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID))
>   }
>   blocksToFetch
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42817) Spark driver logs are filled with Initializing service data for shuffle service using name

2023-03-15 Thread Chandni Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700803#comment-17700803
 ] 

Chandni Singh commented on SPARK-42817:
---

Created PR https://github.com/apache/spark/pull/40448

> Spark driver logs are filled with Initializing service data for shuffle 
> service using name
> --
>
> Key: SPARK-42817
> URL: https://issues.apache.org/jira/browse/SPARK-42817
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Major
>
> With SPARK-34828, we added the ability to make the shuffle service name 
> configurable and we added a log 
> [here|https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala#L118]
>  that will log the shuffle service name. However, this log is printed in the 
> driver logs whenever there is new executor launched and pollutes the log. 
> {code}
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> {code}
> We can just log this once in the driver.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42817) Spark driver logs are filled with Initializing service data for shuffle service using name

2023-03-15 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-42817:
--
Description: 
With SPARK-34828, we added the ability to make the shuffle service name 
configurable and we added a log 
[here|https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala#L118]
 that will log the shuffle service name. However, this log is printed in the 
driver logs whenever there is new executor launched and pollutes the log. 
{code}
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle 
service using name 'spark_shuffle_311'
{code}
We can just log this once in the driver.

> Spark driver logs are filled with Initializing service data for shuffle 
> service using name
> --
>
> Key: SPARK-42817
> URL: https://issues.apache.org/jira/browse/SPARK-42817
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Major
>
> With SPARK-34828, we added the ability to make the shuffle service name 
> configurable and we added a log 
> [here|https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala#L118]
>  that will log the shuffle service name. However, this log is printed in the 
> driver logs whenever there is new executor launched and pollutes the log. 
> {code}
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using name 'spark_shuffle_311'
> 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for 
> shuffle service using 

[jira] [Created] (SPARK-42817) Spark driver logs are filled with Initializing service data for shuffle service using name

2023-03-15 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-42817:
-

 Summary: Spark driver logs are filled with Initializing service 
data for shuffle service using name
 Key: SPARK-42817
 URL: https://issues.apache.org/jira/browse/SPARK-42817
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Chandni Singh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39647) Block push fails with java.lang.IllegalArgumentException: Active local dirs list has not been updated by any executor registration even when the NodeManager hasn't been

2022-06-30 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-39647:
--
Summary: Block push fails with java.lang.IllegalArgumentException: Active 
local dirs list has not been updated by any executor registration even when the 
NodeManager hasn't been restarted  (was: Block push fails with 
java.lang.IllegalArgumentException: Active local dirs list has not been updated 
by any executor registration even when NodeManager hasn't been restarted)

> Block push fails with java.lang.IllegalArgumentException: Active local dirs 
> list has not been updated by any executor registration even when the 
> NodeManager hasn't been restarted
> --
>
> Key: SPARK-39647
> URL: https://issues.apache.org/jira/browse/SPARK-39647
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Major
>
> We saw these exceptions during block push:
> {code:java}
> 22/06/24 13:29:14 ERROR RetryingBlockFetcher: Failed to fetch block 
> shuffle_170_568_174, and will not retry (0 retries)
> org.apache.spark.network.shuffle.BlockPushException: 
> !application_1653753500486_3193550shuffle_170_568_174java.lang.IllegalArgumentException:
>  Active local dirs list has not been updated by any executor registration
>   at 
> org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:92)
>   at 
> org.apache.spark.network.shuffle.RemoteBlockPushResolver.getActiveLocalDirs(RemoteBlockPushResolver.java:300)
>   at 
> org.apache.spark.network.shuffle.RemoteBlockPushResolver.getFile(RemoteBlockPushResolver.java:290)
>   at 
> org.apache.spark.network.shuffle.RemoteBlockPushResolver.getMergedShuffleFile(RemoteBlockPushResolver.java:312)
>   at 
> org.apache.spark.network.shuffle.RemoteBlockPushResolver.lambda$getOrCreateAppShufflePartitionInfo$1(RemoteBlockPushResolver.java:168)
> 22/06/24 13:29:14 WARN UnsafeShuffleWriter: Pushing block shuffle_170_568_174 
> to BlockManagerId(, node-x, 7337, None) failed.
> {code}
> Note: The NodeManager on node-x (node against which this exception was seen) 
> was not restarted.
> The reason this happened is because the executor registers the block manager 
> with {{BlockManagerMaster}} before it registers with the ESS. In push-based 
> shuffle, a block manager is selected by the driver as a merger for the 
> shuffle push. However, the ESS on that node can successfully merge the block 
> only if it has received the metadata about merged directories from the local 
> executor (sent when the local executor registers with the ESS). If this local 
> executor registration is delayed, but the ESS host got picked up as a merger 
> then it will fail to merge the blocks pushed to it which is what happened 
> here.
> The local executor on node-x is executor 754 and the block manager 
> registration happened at 13:28:11
> {code:java}
> 22/06/24 13:28:11 INFO ExecutorAllocationManager: New executor 754 has 
> registered (new total is 1200)
> 22/06/24 13:28:11 INFO BlockManagerMasterEndpoint: Registering block manager 
> node-x:16747 with 2004.6 MB RAM, BlockManagerId(754, node-x, 16747, None)
> {code}
> The application got registered with shuffle server at node-x at 13:29:40
> {code:java}
> 2022-06-24 13:29:40,343 INFO 
> org.apache.spark.network.shuffle.RemoteBlockPushResolver: Updated the active 
> local dirs [/grid/i/tmp/yarn/, /grid/g/tmp/yarn/, /grid/b/tmp/yarn/, 
> /grid/e/tmp/yarn/, /grid/h/tmp/yarn/, /grid/f/tmp/yarn/, /grid/d/tmp/yarn/, 
> /grid/c/tmp/yarn/] for application application_1653753500486_3193550
>  {code}
> node-x was selected as a merger by the driver after 13:28:11 and when the 
> executors started pushing to it, all those pushes failed until 13:29:40
> We can fix by having the executor register with ESS before it registers the 
> block manager with the {{BlockManagerMaster}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39647) Block push fails with java.lang.IllegalArgumentException: Active local dirs list has not been updated by any executor registration even when NodeManager hasn't been rest

2022-06-30 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-39647:
-

 Summary: Block push fails with java.lang.IllegalArgumentException: 
Active local dirs list has not been updated by any executor registration even 
when NodeManager hasn't been restarted
 Key: SPARK-39647
 URL: https://issues.apache.org/jira/browse/SPARK-39647
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.2.0
Reporter: Chandni Singh


We saw these exceptions during block push:
{code:java}
22/06/24 13:29:14 ERROR RetryingBlockFetcher: Failed to fetch block 
shuffle_170_568_174, and will not retry (0 retries)
org.apache.spark.network.shuffle.BlockPushException: 
!application_1653753500486_3193550shuffle_170_568_174java.lang.IllegalArgumentException:
 Active local dirs list has not been updated by any executor registration
at 
org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:92)
at 
org.apache.spark.network.shuffle.RemoteBlockPushResolver.getActiveLocalDirs(RemoteBlockPushResolver.java:300)
at 
org.apache.spark.network.shuffle.RemoteBlockPushResolver.getFile(RemoteBlockPushResolver.java:290)
at 
org.apache.spark.network.shuffle.RemoteBlockPushResolver.getMergedShuffleFile(RemoteBlockPushResolver.java:312)
at 
org.apache.spark.network.shuffle.RemoteBlockPushResolver.lambda$getOrCreateAppShufflePartitionInfo$1(RemoteBlockPushResolver.java:168)

22/06/24 13:29:14 WARN UnsafeShuffleWriter: Pushing block shuffle_170_568_174 
to BlockManagerId(, node-x, 7337, None) failed.
{code}
Note: The NodeManager on node-x (node against which this exception was seen) 
was not restarted.

The reason this happened is because the executor registers the block manager 
with {{BlockManagerMaster}} before it registers with the ESS. In push-based 
shuffle, a block manager is selected by the driver as a merger for the shuffle 
push. However, the ESS on that node can successfully merge the block only if it 
has received the metadata about merged directories from the local executor 
(sent when the local executor registers with the ESS). If this local executor 
registration is delayed, but the ESS host got picked up as a merger then it 
will fail to merge the blocks pushed to it which is what happened here.

The local executor on node-x is executor 754 and the block manager registration 
happened at 13:28:11
{code:java}
22/06/24 13:28:11 INFO ExecutorAllocationManager: New executor 754 has 
registered (new total is 1200)

22/06/24 13:28:11 INFO BlockManagerMasterEndpoint: Registering block manager 
node-x:16747 with 2004.6 MB RAM, BlockManagerId(754, node-x, 16747, None)
{code}
The application got registered with shuffle server at node-x at 13:29:40
{code:java}
2022-06-24 13:29:40,343 INFO 
org.apache.spark.network.shuffle.RemoteBlockPushResolver: Updated the active 
local dirs [/grid/i/tmp/yarn/, /grid/g/tmp/yarn/, /grid/b/tmp/yarn/, 
/grid/e/tmp/yarn/, /grid/h/tmp/yarn/, /grid/f/tmp/yarn/, /grid/d/tmp/yarn/, 
/grid/c/tmp/yarn/] for application application_1653753500486_3193550
 {code}

node-x was selected as a merger by the driver after 13:28:11 and when the 
executors started pushing to it, all those pushes failed until 13:29:40

We can fix by having the executor register with ESS before it registers the 
block manager with the {{BlockManagerMaster}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-39471) ExternalShuffleService port is extracted from the confs everytime a block manager gets registered

2022-06-14 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh resolved SPARK-39471.
---
Resolution: Not A Problem

Created by mistake. This is not a problem in the master

> ExternalShuffleService port is extracted from the confs everytime a block 
> manager gets registered
> -
>
> Key: SPARK-39471
> URL: https://issues.apache.org/jira/browse/SPARK-39471
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Major
>
> We are extracting the shuffle service port from the spark and hadoop 
> configuration with every blockmanager registeration. This could be avoided by 
> using the cached external service port. 
> Also, we should avoid calling addMerger when push-based shuffle is disabled. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39471) ExternalShuffleService port is extracted from the confs everytime a block manager gets registered

2022-06-14 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-39471:
-

 Summary: ExternalShuffleService port is extracted from the confs 
everytime a block manager gets registered
 Key: SPARK-39471
 URL: https://issues.apache.org/jira/browse/SPARK-39471
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Chandni Singh


We are extracting the shuffle service port from the spark and hadoop 
configuration with every blockmanager registeration. This could be avoided by 
using the cached external service port. 

Also, we should avoid calling addMerger when push-based shuffle is disabled. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38973) When push-based shuffle is enabled, a stage may not complete when retried

2022-04-20 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-38973:
--
Description: 
With push-based shuffle enabled and adaptive merge finalization, there are 
scenarios where a re-attempt of ShuffleMapStage may not complete. 

With Adaptive Merge Finalization, a stage may be triggered for finalization 
when it is in the below state:
 # The stage is *not* running ({*}not{*} in the _running_ set of the 
DAGScheduler) - had failed or canceled or waiting, and
 # The stage has no pending partitions (all the tasks completed at-least once).

For such a stage when the finalization completes, the stage will still not be 
marked as {_}mergeFinalized{_}. 

The stage of the stage will be: 
 * _stage.shuffleDependency.mergeFinalized = false_
 * _stage.shuffleDependency.getFinalizeTask = finalizeTask_
 * Merged statuses of the state are unregistered

 

When the stage is resubmitted, the newer attempt of the stage will never 
complete even though its tasks may be completed. This is because the newer 
attempt of the stage will have {_}shuffleMergeEnabled = true{_}, since with the 
previous attempt the stage was never marked as {_}mergedFinalized{_}, and the 
_finalizeTask_ is present (from finalization attempt for previous stage 
attempt).

 

So, when all the tasks of the newer attempt complete, then these conditions 
will be true:
 * stage will be running
 * There will be no pending partitions since all the tasks completed

 * _stage.shuffleDependency.shuffleMergeEnabled = true_

 * _stage.shuffleDependency.shuffleMergeFinalized = false_

 * _stage.shuffleDependency.getFinalizeTask_ is not empty

This leads the DAGScheduler to try scheduling finalization and not trigger the 
completion of the Stage. However because of the last condition it never even 
schedules the finalization and the stage never completes.

  was:
With push-based shuffle enabled and adaptive merge finalization, there are 
scenarios where a re-attempt of ShuffleMapStage may not complete. 

With Adaptive Merge Finalization, a stage may be triggered for finalization 
when it is in the below state:
 # The stage is *not* running ({*}not{*} in the _running_ set of the 
DAGScheduler) - had failed or canceled or waiting, and
 # The stage has no pending partitions - completed.

For such a stage when the finalization completes, the stage will still not be 
marked as {_}mergeFinalized{_}. 

The stage of the stage will be: 
 * _stage.shuffleDependency.mergeFinalized = false_
 * _stage.shuffleDependency.getFinalizeTask = finalizeTask_
 * Merged statuses of the state are unregistered

 

When the stage is resubmitted, the newer attempt of the stage will never 
complete even though its tasks may be completed. This is because the newer 
attempt of the stage will have {_}shuffleMergeEnabled = true{_}, since with the 
previous attempt the stage was never marked as {_}mergedFinalized{_}, and the 
_finalizeTask_ is present (from finalization attempt for previous stage 
attempt).

 

So, when all the tasks of the newer attempt complete, then these conditions 
will be true:
 * stage will be running
 * There will be no pending partitions since all the tasks completed

 * _stage.shuffleDependency.shuffleMergeEnabled = true_

 * _stage.shuffleDependency.shuffleMergeFinalized = false_

 * _stage.shuffleDependency.getFinalizeTask_ is not empty

This leads the DAGScheduler to try scheduling finalization and not trigger the 
completion of the Stage. However because of the last condition it never even 
schedules the finalization and the stage never completes.


> When push-based shuffle is enabled, a stage may not complete when retried
> -
>
> Key: SPARK-38973
> URL: https://issues.apache.org/jira/browse/SPARK-38973
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Major
>
> With push-based shuffle enabled and adaptive merge finalization, there are 
> scenarios where a re-attempt of ShuffleMapStage may not complete. 
> With Adaptive Merge Finalization, a stage may be triggered for finalization 
> when it is in the below state:
>  # The stage is *not* running ({*}not{*} in the _running_ set of the 
> DAGScheduler) - had failed or canceled or waiting, and
>  # The stage has no pending partitions (all the tasks completed at-least 
> once).
> For such a stage when the finalization completes, the stage will still not be 
> marked as {_}mergeFinalized{_}. 
> The stage of the stage will be: 
>  * _stage.shuffleDependency.mergeFinalized = false_
>  * _stage.shuffleDependency.getFinalizeTask = finalizeTask_
>  * Merged statuses of the state are unregistered
>  
> When the stage is resubmitted, the newer attempt of the 

[jira] [Updated] (SPARK-38973) When push-based shuffle is enabled, a stage may not complete when retried

2022-04-20 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-38973:
--
Description: 
With push-based shuffle enabled and adaptive merge finalization, there are 
scenarios where a re-attempt of ShuffleMapStage may not complete. 

With Adaptive Merge Finalization, a stage may be triggered for finalization 
when it is in the below state:
 # The stage is *not* running ({*}not{*} in the _running_ set of the 
DAGScheduler) - had failed or canceled or waiting, and
 # The stage has no pending partitions - completed.

For such a stage when the finalization completes, the stage will still not be 
marked as {_}mergeFinalized{_}. 

The stage of the stage will be: 
 * _stage.shuffleDependency.mergeFinalized = false_
 * _stage.shuffleDependency.getFinalizeTask = finalizeTask_
 * Merged statuses of the state are unregistered

 

When the stage is resubmitted, the newer attempt of the stage will never 
complete even though its tasks may be completed. This is because the newer 
attempt of the stage will have {_}shuffleMergeEnabled = true{_}, since with the 
previous attempt the stage was never marked as {_}mergedFinalized{_}, and the 
_finalizeTask_ is present (from finalization attempt for previous stage 
attempt).

 

So, when all the tasks of the newer attempt complete, then these conditions 
will be true:
 * stage will be running
 * There will be no pending partitions since all the tasks completed

 * _stage.shuffleDependency.shuffleMergeEnabled = true_

 * _stage.shuffleDependency.shuffleMergeFinalized = false_

 * _stage.shuffleDependency.getFinalizeTask_ is not empty

This leads the DAGScheduler to try scheduling finalization and not trigger the 
completion of the Stage. However because of the last condition it never even 
schedules the finalization and the stage never completes.

  was:
With push-based shuffle enabled and adaptive merge finalization, there are 
scenarios where a re-attempt of ShuffleMapStage may not complete. 

With Adaptive Merge Finalization, a stage may be triggered for finalization 
before it completes - and can subsequently fail or be canceled. 

The stage may be in the below state when it is finalized:
 # The stage is *not* running ({*}not{*} in the _running_ set of the 
DAGScheduler) - had failed or canceled, and
 # The stage has no pending partitions - completed.

For such a stage when the finalization completes, the stage will still not be 
marked as {_}mergeFinalized{_}. 

The stage of the stage will be: 
 * _stage.shuffleDependency.mergeFinalized = false_
 * _stage.shuffleDependency.getFinalizeTask = finalizeTask_
 * Merged statuses of the state are unregistered

 

When the stage is resubmitted, the newer attempt of the stage will never 
complete even though its tasks may be completed. This is because the newer 
attempt of the stage will have \{_}shuffleMergeEnabled = true{_}, since with 
the previous attempt the stage was never marked as {_}mergedFinalized{_}, and 
the _finalizeTask_ is present (from finalization attempt for previous stage 
attempt).

 

So, when all the tasks of the newer attempt complete, then these conditions 
will be true:
 * stage will be running
 * There will be no pending partitions since all the tasks completed

 * _stage.shuffleDependency.shuffleMergeEnabled = true_

 * _stage.shuffleDependency.shuffleMergeFinalized = false_

 * _stage.shuffleDependency.getFinalizeTask_ is not empty

This leads the DAGScheduler to try scheduling finalization and not trigger the 
completion of the Stage. However because of the last condition it never even 
schedules the finalization and the stage never completes.


> When push-based shuffle is enabled, a stage may not complete when retried
> -
>
> Key: SPARK-38973
> URL: https://issues.apache.org/jira/browse/SPARK-38973
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Major
>
> With push-based shuffle enabled and adaptive merge finalization, there are 
> scenarios where a re-attempt of ShuffleMapStage may not complete. 
> With Adaptive Merge Finalization, a stage may be triggered for finalization 
> when it is in the below state:
>  # The stage is *not* running ({*}not{*} in the _running_ set of the 
> DAGScheduler) - had failed or canceled or waiting, and
>  # The stage has no pending partitions - completed.
> For such a stage when the finalization completes, the stage will still not be 
> marked as {_}mergeFinalized{_}. 
> The stage of the stage will be: 
>  * _stage.shuffleDependency.mergeFinalized = false_
>  * _stage.shuffleDependency.getFinalizeTask = finalizeTask_
>  * Merged statuses of the state are unregistered
>  
> When the stage is resubmitted, 

[jira] [Created] (SPARK-38973) When push-based shuffle is enabled, a stage may not complete when retried

2022-04-20 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-38973:
-

 Summary: When push-based shuffle is enabled, a stage may not 
complete when retried
 Key: SPARK-38973
 URL: https://issues.apache.org/jira/browse/SPARK-38973
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 3.2.0
Reporter: Chandni Singh


With push-based shuffle enabled and adaptive merge finalization, there are 
scenarios where a re-attempt of ShuffleMapStage may not complete. 

With Adaptive Merge Finalization, a stage may be triggered for finalization 
before it completes - and can subsequently fail or be canceled. 

The stage may be in the below state when it is finalized:
 # The stage is *not* running ({*}not{*} in the _running_ set of the 
DAGScheduler) - had failed or canceled, and
 # The stage has no pending partitions - completed.

For such a stage when the finalization completes, the stage will still not be 
marked as {_}mergeFinalized{_}. 

The stage of the stage will be: 
 * _stage.shuffleDependency.mergeFinalized = false_
 * _stage.shuffleDependency.getFinalizeTask = finalizeTask_
 * Merged statuses of the state are unregistered

 

When the stage is resubmitted, the newer attempt of the stage will never 
complete even though its tasks may be completed. This is because the newer 
attempt of the stage will have \{_}shuffleMergeEnabled = true{_}, since with 
the previous attempt the stage was never marked as {_}mergedFinalized{_}, and 
the _finalizeTask_ is present (from finalization attempt for previous stage 
attempt).

 

So, when all the tasks of the newer attempt complete, then these conditions 
will be true:
 * stage will be running
 * There will be no pending partitions since all the tasks completed

 * _stage.shuffleDependency.shuffleMergeEnabled = true_

 * _stage.shuffleDependency.shuffleMergeFinalized = false_

 * _stage.shuffleDependency.getFinalizeTask_ is not empty

This leads the DAGScheduler to try scheduling finalization and not trigger the 
completion of the Stage. However because of the last condition it never even 
schedules the finalization and the stage never completes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37675) Push-based merge finalization bugs in the RemoteBlockPushResolver

2022-01-27 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-37675:
--
Description: 
We identified 3 issues in the handling of merge finalization requests in the 
RemoteBlockPushResolver:
1. Empty merge data
If the shuffle gets finalized while a reducer partition is still receiving its 
first block, when merger finalizes that partition, we will end up with no data 
in the files - as it gets truncated to the last good position (which will be 0 
in this case).
Even though no data exists for the reducer - we still add it to result (merged 
reducerIds).

2. Overwriting of the merged data file of a reduce partition after it is 
finalized
This is a more involved issue where some specific set of situations must occur, 
and starts with how our check for a {{too late block}} is done 
[here|https://github.com/apache/spark/blob/50758ab1a3d6a5f73a2419149a1420d103930f77/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L180].
The example below gives more details, but in a nutshell we have the following 
for a DETERMINATE shuffle:
 # Merge starts, blocks are accepted.
 # Merge is finalized.
 ** Files closed, status reported to driver, appShuffleInfo.shuffles cleaned up.
 # Late block push from an executor received.
 ** Request for a reducer for which merger never received a data until then - 
so no on-disk files
 ** Our check does not catch this case - we end up (re-) starting merge.
 # Executor could now push blocks for reducers which were finalized earlier.
 ** Files are truncated.
 # Reads will see inconsistent state due to the ongoing writes.

Explaining this with an example with for a DETERMINATE shuffleId 1, 
shuffleMergeId 0, and reduce partitions 100 and 200:
 # shufflePush_1_0_0_100 is received by the RemoteBlockPushResolver.
 ## No meta information existed for shuffle 1 so shuffle service creates 
AppShuffleMergePartitionsInfo for shuffle 1 and shuffleMerge 0 to start merge.
 ## Merge starts with RemoteBlockPushResolver and it creates the data file for 
the merger request shuffleMerged_$APP_ID_1_0_100.data (along with index/meta 
files)
 # FinalizeShuffleMerge message for shuffleId 1 and shuffleMerged 0 is received 
by RemoteBlockPushResolver. In a thread safe manner:
 ## AppShuffleMergePartitionsInfo for shuffle 1 is removed from the map in 
memory.
 ## shuffleMerged_$APP_ID_1_0_100.data/index/meta files are closed.
 ## Driver is informed that partition 100 of shuffleId 1/mergeId 0 was merged.
 # shufflePush_1_0_0_200 is received by the RemoteBlockPushResolver.
 ## A new AppShuffleMergePartitionsInfo is added since:
 ### There is no AppShuffleMergePartitionsInfo for shuffle 1/merged id 0 - as 
it was removed during finalization, and
 ### The merger had never received data for partition 200 until then.
 ## With this, shuffleMerged…200.data is created, and on that merger, merge for 
shuffleId 1/mergeId 0 starts again.
 # shufflePush_1_0_5_100 is received by the RemoteBlockPushResolver. We 
randomize the order of pushes, so late pushes from an executor can end up 
pushing reducer 200 followed by data for reducer 100.
 ## AppShuffleMergePartitionsInfo was created for shuffle 1 and shuffleMerged 0 
in 3-1 which doesn’t have the reduce id 100, the data/index/meta files for 
these partitions will be recreated. Reference code.

3. Throwing exception in the finalization of a shuffle for which the shuffle 
server didn't receive any blocks.
For very small stages and with low minCompletedPushRatio/minShuffleSizeToWait, 
the driver can initiate the finalization of a shuffle right away. The shuffle 
server may not receive any push blocks and so there will not be a 
{{AppShuffleMergePartitionsInfo}} instance corresponding to the shuffle in the 
state. In this case, we should mark the shuffle as finalized and return empty 
results.

> Push-based merge finalization bugs in the RemoteBlockPushResolver
> -
>
> Key: SPARK-37675
> URL: https://issues.apache.org/jira/browse/SPARK-37675
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Cheng Pan
>Priority: Major
>
> We identified 3 issues in the handling of merge finalization requests in the 
> RemoteBlockPushResolver:
> 1. Empty merge data
> If the shuffle gets finalized while a reducer partition is still receiving 
> its first block, when merger finalizes that partition, we will end up with no 
> data in the files - as it gets truncated to the last good position (which 
> will be 0 in this case).
> Even though no data exists for the reducer - we still add it to result 
> (merged reducerIds).
> 2. Overwriting of the merged data file of a reduce partition after it is 
> finalized
> This is a more 

[jira] [Updated] (SPARK-37675) Push-based merge finalization bugs in the RemoteBlockPushResolver

2022-01-27 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-37675:
--
Summary: Push-based merge finalization bugs in the RemoteBlockPushResolver  
(was: Return PushMergedRemoteMetaFailedFetchResult if no available push-merged 
block)

> Push-based merge finalization bugs in the RemoteBlockPushResolver
> -
>
> Key: SPARK-37675
> URL: https://issues.apache.org/jira/browse/SPARK-37675
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Cheng Pan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38005) Support cleaning up merged shuffle files and state from external shuffle service

2022-01-24 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-38005:
--
Parent: SPARK-33235
Issue Type: Sub-task  (was: Improvement)

> Support cleaning up merged shuffle files and state from external shuffle 
> service
> 
>
> Key: SPARK-38005
> URL: https://issues.apache.org/jira/browse/SPARK-38005
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Major
>
> Currently merged shuffle files and state is not cleaned up until an 
> application ends. SPARK-37618 handles the cleanup of regular shuffle files. 
> This jira will address cleaning up of merged shuffle files/state.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38005) Support cleaning up merged shuffle files and state from external shuffle service

2022-01-24 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-38005:
--
Description: Currently merged shuffle files and state is not cleaned up 
until an application ends. SPARK-37618 handles the cleanup of regular shuffle 
files. This jira will address cleaning up of merged shuffle files/state.  (was: 
Currently merged shuffle files and state is not cleaned up until an application 
ends. 

But shuffle files will still stick around until an application completes. 
Dynamic allocation is commonly used for long runnin)

> Support cleaning up merged shuffle files and state from external shuffle 
> service
> 
>
> Key: SPARK-38005
> URL: https://issues.apache.org/jira/browse/SPARK-38005
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Major
>
> Currently merged shuffle files and state is not cleaned up until an 
> application ends. SPARK-37618 handles the cleanup of regular shuffle files. 
> This jira will address cleaning up of merged shuffle files/state.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38005) Support cleaning up merged shuffle files and state from external shuffle service

2022-01-24 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-38005:
--
Description: 
Currently merged shuffle files and state is not cleaned up until an application 
ends. 

But shuffle files will still stick around until an application completes. 
Dynamic allocation is commonly used for long runnin

  was:
Currently shuffle data is not cleaned up when an external shuffle service is 
used and the associated executor has been deallocated before the shuffle is 
cleaned up. Shuffle data is only cleaned up once the application ends.

There have been various issues filed for this:

https://issues.apache.org/jira/browse/SPARK-26020

https://issues.apache.org/jira/browse/SPARK-17233

https://issues.apache.org/jira/browse/SPARK-4236

But shuffle files will still stick around until an application completes. 
Dynamic allocation is commonly used for long running jobs (such as structured 
streaming), so any long running jobs with a large shuffle involved will 
eventually fill up local disk space. The shuffle service already supports 
cleaning up shuffle service persisted RDDs, so it should be able to support 
cleaning up shuffle blocks as well once the shuffle is removed by the 
ContextCleaner. 

The current alternative is to use shuffle tracking instead of an external 
shuffle service, but this is less optimal from a resource perspective as all 
executors must be kept alive until the shuffle has been fully consumed and 
cleaned up (and with the default GC interval being 30 minutes this can waste a 
lot of time with executors held onto but not doing anything).


> Support cleaning up merged shuffle files and state from external shuffle 
> service
> 
>
> Key: SPARK-38005
> URL: https://issues.apache.org/jira/browse/SPARK-38005
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Major
>
> Currently merged shuffle files and state is not cleaned up until an 
> application ends. 
> But shuffle files will still stick around until an application completes. 
> Dynamic allocation is commonly used for long runnin



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38005) Support cleaning up merged shuffle files and state from external shuffle service

2022-01-24 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-38005:
-

 Summary: Support cleaning up merged shuffle files and state from 
external shuffle service
 Key: SPARK-38005
 URL: https://issues.apache.org/jira/browse/SPARK-38005
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 3.2.0
Reporter: Chandni Singh


Currently shuffle data is not cleaned up when an external shuffle service is 
used and the associated executor has been deallocated before the shuffle is 
cleaned up. Shuffle data is only cleaned up once the application ends.

There have been various issues filed for this:

https://issues.apache.org/jira/browse/SPARK-26020

https://issues.apache.org/jira/browse/SPARK-17233

https://issues.apache.org/jira/browse/SPARK-4236

But shuffle files will still stick around until an application completes. 
Dynamic allocation is commonly used for long running jobs (such as structured 
streaming), so any long running jobs with a large shuffle involved will 
eventually fill up local disk space. The shuffle service already supports 
cleaning up shuffle service persisted RDDs, so it should be able to support 
cleaning up shuffle blocks as well once the shuffle is removed by the 
ContextCleaner. 

The current alternative is to use shuffle tracking instead of an external 
shuffle service, but this is less optimal from a resource perspective as all 
executors must be kept alive until the shuffle has been fully consumed and 
cleaned up (and with the default GC interval being 30 minutes this can waste a 
lot of time with executors held onto but not doing anything).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36389) Revert the change that accepts negative mapId

2021-08-02 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-36389:
--
Description: With SPARK-32922, we added a change that {{ShuffleBlockId}} 
can have a negative mapId. This was to support push-based shuffle where {{-1}} 
as mapId indicated a push-merged block. However with SPARK-32923, a different 
type of {{BlockId}} was introduced - {{ShuffleMergedId}}, but reverting the 
change to {{ShuffleBlockId}} was missed.   (was: With SPARK-32922, we added a 
change that {{ShuffleBlockId}} can have a negative mapId. This was to support 
push-based shuffle where {{-1}} as mapId indicated a push-merged block. However 
with SPARK-32923, a different type of {{BlockId}} was introduce - 
{{ShuffleMergedId}}, but reverting the change to {{ShuffleBlockId}} was missed. 
)

> Revert the change that accepts negative mapId
> -
>
> Key: SPARK-36389
> URL: https://issues.apache.org/jira/browse/SPARK-36389
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Minor
>
> With SPARK-32922, we added a change that {{ShuffleBlockId}} can have a 
> negative mapId. This was to support push-based shuffle where {{-1}} as mapId 
> indicated a push-merged block. However with SPARK-32923, a different type of 
> {{BlockId}} was introduced - {{ShuffleMergedId}}, but reverting the change to 
> {{ShuffleBlockId}} was missed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36389) Revert the change that accepts negative mapId in ShuffleBlockId

2021-08-02 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-36389:
--
Summary: Revert the change that accepts negative mapId in ShuffleBlockId  
(was: Revert the change that accepts negative mapId)

> Revert the change that accepts negative mapId in ShuffleBlockId
> ---
>
> Key: SPARK-36389
> URL: https://issues.apache.org/jira/browse/SPARK-36389
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.2.0
>Reporter: Chandni Singh
>Priority: Minor
>
> With SPARK-32922, we added a change that {{ShuffleBlockId}} can have a 
> negative mapId. This was to support push-based shuffle where {{-1}} as mapId 
> indicated a push-merged block. However with SPARK-32923, a different type of 
> {{BlockId}} was introduced - {{ShuffleMergedId}}, but reverting the change to 
> {{ShuffleBlockId}} was missed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36389) Revert the change that accepts negative mapId

2021-08-02 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-36389:
-

 Summary: Revert the change that accepts negative mapId
 Key: SPARK-36389
 URL: https://issues.apache.org/jira/browse/SPARK-36389
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.2.0
Reporter: Chandni Singh


With SPARK-32922, we added a change that {{ShuffleBlockId}} can have a negative 
mapId. This was to support push-based shuffle where {{-1}} as mapId indicated a 
push-merged block. However with SPARK-32923, a different type of {{BlockId}} 
was introduce - {{ShuffleMergedId}}, but reverting the change to 
{{ShuffleBlockId}} was missed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36255) FileNotFoundException from the shuffle push can cause the executor to terminate

2021-07-22 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-36255:
--
Description: 
Once the shuffle is cleaned up by the {{ContextCleaner}}, the shuffle files are 
deleted by the executors. In this case, the push of the shuffle data by the 
executors can throw {{FileNotFoundException}}. When this exception is thrown 
from the {{shuffle-block-push-thread}}, it causes the executor to fail. This is 
because of the default uncaught exception handler for Spark daemon threads 
which terminates the executor when there are uncaught exceptions for the daemon 
threads.
{code:java}
21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception 
in thread Thread[block-push-thread-1,5,main]
java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer

{file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error in opening 
FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}

at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
at 
org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
at 
org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
Caused by: java.io.FileNotFoundException: 
**/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data
 (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.(RandomAccessFile.java:243)
at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62)
{code}
We can address the issue by handling "FileNotFound" exceptions in the push 
threads and netty threads by stopping the push when {{FileNotFound}} is 
encountered.

  was:
When the shuffle files are cleaned up by the executors once a job in a Spark 
application completes, the push of the shuffle data by the executor can throw 
FileNotFound exception. When this exception is thrown from the 
{{shuffle-block-push-thread}}, it causes the executor to fail. This is because 
of the default uncaught exception handler for Spark daemon threads which 
terminates the executor when there are uncaught exceptions for the daemon 
threads.
{code:java}
21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception 
in thread Thread[block-push-thread-1,5,main]
java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer

{file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error in opening 
FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}

at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
at 
org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
at 
org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
Caused by: java.io.FileNotFoundException: 
**/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data
 (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at 

[jira] [Updated] (SPARK-36255) FileNotFoundException from the shuffle push can cause the executor to terminate

2021-07-22 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-36255:
--
Summary: FileNotFoundException from the shuffle push can cause the executor 
to terminate  (was: FileNotFound exceptions from the shuffle push can cause the 
executor to terminate)

> FileNotFoundException from the shuffle push can cause the executor to 
> terminate
> ---
>
> Key: SPARK-36255
> URL: https://issues.apache.org/jira/browse/SPARK-36255
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> When the shuffle files are cleaned up by the executors once a job in a Spark 
> application completes, the push of the shuffle data by the executor can throw 
> FileNotFound exception. When this exception is thrown from the 
> {{shuffle-block-push-thread}}, it causes the executor to fail. This is 
> because of the default uncaught exception handler for Spark daemon threads 
> which terminates the executor when there are uncaught exceptions for the 
> daemon threads.
> {code:java}
> 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught 
> exception in thread Thread[block-push-thread-1,5,main]
> java.lang.Error: java.io.IOException: Error in opening 
> FileSegmentManagedBuffer
> {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
>  offset=10640, length=190}
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Error in opening 
> FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
>  offset=10640, length=190}
> at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
> at 
> org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
> at 
> org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
> at 
> org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
> at 
> org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ... 2 more
> Caused by: java.io.FileNotFoundException: 
> **/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data
>  (No such file or directory)
> at java.io.RandomAccessFile.open0(Native Method)
> at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
> at java.io.RandomAccessFile.(RandomAccessFile.java:243)
> at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62)
> {code}
> We can address the issue by handling "FileNotFound" exceptions in the push 
> threads and netty threads by stopping the push when {{FileNotFound}} is 
> encountered.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36255) FileNotFound exceptions from the shuffle push can cause the executor to terminate

2021-07-21 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-36255:
--
Summary: FileNotFound exceptions from the shuffle push can cause the 
executor to terminate  (was: FileNotFound exceptions in the Shuffle-push-thread 
can cause the executor to fail)

> FileNotFound exceptions from the shuffle push can cause the executor to 
> terminate
> -
>
> Key: SPARK-36255
> URL: https://issues.apache.org/jira/browse/SPARK-36255
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> When the shuffle files are cleaned up by the executors once a job in a Spark 
> application completes, the push of the shuffle data by the executor can throw 
> FileNotFound exception. When this exception is thrown from the 
> {{shuffle-block-push-thread}}, it causes the executor to fail. This is 
> because of the default uncaught exception handler for Spark daemon threads 
> which terminates the executor when there are uncaught exceptions for the 
> daemon threads.
> {code:java}
> 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught 
> exception in thread Thread[block-push-thread-1,5,main]
> java.lang.Error: java.io.IOException: Error in opening 
> FileSegmentManagedBuffer
> {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
>  offset=10640, length=190}
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Error in opening 
> FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
>  offset=10640, length=190}
> at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
> at 
> org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
> at 
> org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
> at 
> org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
> at 
> org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ... 2 more
> Caused by: java.io.FileNotFoundException: 
> **/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data
>  (No such file or directory)
> at java.io.RandomAccessFile.open0(Native Method)
> at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
> at java.io.RandomAccessFile.(RandomAccessFile.java:243)
> at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62)
> {code}
> We can address the issue by handling "FileNotFound" exceptions in the push 
> threads and netty threads by stopping the push when {{FileNotFound}} is 
> encountered.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36255) FileNotFound exceptions in the Shuffle-push-thread can cause the executor to fail

2021-07-21 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-36255:
--
Description: 
When the shuffle files are cleaned up by the executors once a job in a Spark 
application completes, the push of the shuffle data by the executor can throw 
FileNotFound exception. When this exception is thrown from the 
{{shuffle-block-push-thread}}, it causes the executor to fail. This is because 
of the default uncaught exception handler for Spark daemon threads which 
terminates the executor when there are uncaught exceptions for the daemon 
threads.
{code:java}
21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception 
in thread Thread[block-push-thread-1,5,main]
java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer

{file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error in opening 
FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}

at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
at 
org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
at 
org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
Caused by: java.io.FileNotFoundException: 
**/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data
 (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.(RandomAccessFile.java:243)
at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62)
{code}
We can address the issue by handling "FileNotFound" exceptions in the push 
threads and netty threads by stopping the push when {{FileNotFound}} is 
encountered.

  was:
When the shuffle files are cleaned up by the executors once a job in a Spark 
application completes, the push of the shuffle data by the executor can throw 
FileNotFound exception. When this exception is thrown from the 
{{shuffle-block-push-thread}}, it causes the executor to fail. This is because 
of the default uncaught exception handler for Spark daemon threads which 
terminates the executor when there are exceptions for the daemon threads.
{code:java}
21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception 
in thread Thread[block-push-thread-1,5,main]
java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer

{file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error in opening 
FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}

at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
at 
org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
at 
org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
Caused by: java.io.FileNotFoundException: 
**/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data
 (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.(RandomAccessFile.java:243)
at 

[jira] [Updated] (SPARK-36255) FileNotFound exceptions in the Shuffle-push-thread can cause the executor to fail

2021-07-21 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-36255:
--
Description: 
When the shuffle files are cleaned up by the executors once a job in a Spark 
application completes, the push of the shuffle data by the executor can throw 
FileNotFound exception. When this exception is thrown from the 
{{shuffle-block-push-thread}}, it causes the executor to fail. This is because 
of the default uncaught exception handler for Spark daemon threads which 
terminates the executor when there are exceptions for the daemon threads.
{code:java}
21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception 
in thread Thread[block-push-thread-1,5,main]
java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer

{file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error in opening 
FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
 offset=10640, length=190}

at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
at 
org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270)
at 
org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191)
at 
org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
Caused by: java.io.FileNotFoundException: 
**/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data
 (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method)
at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
at java.io.RandomAccessFile.(RandomAccessFile.java:243)
at 
org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62)
{code}
We can address the issue by handling "FileNotFound" exceptions in the push 
threads and netty threads by stopping the push when {{FileNotFound}} is 
encountered.

  was:When the shuffle files are cleaned up the executors once a job completes, 
the push of the shuffle data will throw FileNotFound exceptions. This exception 
when thrown from the {{shuffle-block-push-thread}} still causes the executor to 
fail. 


> FileNotFound exceptions in the Shuffle-push-thread can cause the executor to 
> fail
> -
>
> Key: SPARK-36255
> URL: https://issues.apache.org/jira/browse/SPARK-36255
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> When the shuffle files are cleaned up by the executors once a job in a Spark 
> application completes, the push of the shuffle data by the executor can throw 
> FileNotFound exception. When this exception is thrown from the 
> {{shuffle-block-push-thread}}, it causes the executor to fail. This is 
> because of the default uncaught exception handler for Spark daemon threads 
> which terminates the executor when there are exceptions for the daemon 
> threads.
> {code:java}
> 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught 
> exception in thread Thread[block-push-thread-1,5,main]
> java.lang.Error: java.io.IOException: Error in opening 
> FileSegmentManagedBuffer
> {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
>  offset=10640, length=190}
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Error in opening 
> FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data,
>  offset=10640, length=190}
> at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89)
> at 
> org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294)
> at 
> 

[jira] [Updated] (SPARK-36255) FileNotFound exceptions in the Shuffle-push-thread can cause the executor to fail

2021-07-21 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-36255:
--
Parent: SPARK-30602
Issue Type: Sub-task  (was: Bug)

> FileNotFound exceptions in the Shuffle-push-thread can cause the executor to 
> fail
> -
>
> Key: SPARK-36255
> URL: https://issues.apache.org/jira/browse/SPARK-36255
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> When the shuffle files are cleaned up the executors once a job completes, the 
> push of the shuffle data will throw FileNotFound exceptions. This exception 
> when thrown from the {{shuffle-block-push-thread}} still causes the executor 
> to fail. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36255) FileNotFound exceptions in the Shuffle-push-thread can cause the executor to fail

2021-07-21 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-36255:
-

 Summary: FileNotFound exceptions in the Shuffle-push-thread can 
cause the executor to fail
 Key: SPARK-36255
 URL: https://issues.apache.org/jira/browse/SPARK-36255
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.1.0
Reporter: Chandni Singh


When the shuffle files are cleaned up the executors once a job completes, the 
push of the shuffle data will throw FileNotFound exceptions. This exception 
when thrown from the {{shuffle-block-push-thread}} still causes the executor to 
fail. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35917) Disable push-based shuffle until the feature is complete

2021-06-29 Thread Chandni Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371445#comment-17371445
 ] 

Chandni Singh commented on SPARK-35917:
---

[~code_kr_dev_s] I already opened a PR for this yesterday. It is linked to the 
jira

> Disable push-based shuffle until the feature is complete
> 
>
> Key: SPARK-35917
> URL: https://issues.apache.org/jira/browse/SPARK-35917
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> Push-based shuffle is partially merged in apache master but some of the tasks 
> are still incomplete. Since 3.2 is going to cut soon, we will not be able to 
> get the pending tasks reviewed and merged. Few of the pending tasks make 
> protocol changes to the push-based shuffle protocols, so we would like to 
> prevent users from enabling push-based shuffle both on the client and the 
> server until push-based shuffle implementation is complete. 
> We can prevent push-based shuffle to be used by throwing 
> {{UnsupportedOperationException}} (or something like that) both on the client 
> and the server when the user tries to enable it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35917) Disable push-based shuffle until the feature is complete

2021-06-28 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-35917:
--
Description: 
Push-based shuffle is partially merged in apache master but some of the tasks 
are still incomplete. Since 3.2 is going to cut soon, we will not be able to 
get the pending tasks reviewed and merged. Few of the pending tasks make 
protocol changes to the push-based shuffle protocols, so we would like to 
prevent users from enabling push-based shuffle both on the client and the 
server until push-based shuffle implementation is complete. 
We can prevent push-based shuffle to be used by throwing 
{{UnsupportedOperationException}} (or something like that) both on the client 
and the server when the user tries to enable it.

> Disable push-based shuffle until the feature is complete
> 
>
> Key: SPARK-35917
> URL: https://issues.apache.org/jira/browse/SPARK-35917
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> Push-based shuffle is partially merged in apache master but some of the tasks 
> are still incomplete. Since 3.2 is going to cut soon, we will not be able to 
> get the pending tasks reviewed and merged. Few of the pending tasks make 
> protocol changes to the push-based shuffle protocols, so we would like to 
> prevent users from enabling push-based shuffle both on the client and the 
> server until push-based shuffle implementation is complete. 
> We can prevent push-based shuffle to be used by throwing 
> {{UnsupportedOperationException}} (or something like that) both on the client 
> and the server when the user tries to enable it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35917) Disable push-based shuffle until the feature is complete

2021-06-28 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-35917:
-

 Summary: Disable push-based shuffle until the feature is complete
 Key: SPARK-35917
 URL: https://issues.apache.org/jira/browse/SPARK-35917
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Chandni Singh






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35836) Remove reference to spark.shuffle.push.based.enabled in ShuffleBlockPusherSuite

2021-06-21 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-35836:
-

 Summary: Remove reference to spark.shuffle.push.based.enabled in 
ShuffleBlockPusherSuite
 Key: SPARK-35836
 URL: https://issues.apache.org/jira/browse/SPARK-35836
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Chandni Singh


The test suite for ShuffleBlockPusherSuite was added with SPARK-32917 and in 
this suite, the configuration for push-based shuffle is incorrectly referenced 
as {{spark.shuffle.push.based.enabled}}. We need to remove this config from 
here.

{{ShuffleBlockPusher}} is created only when push based shuffle is enabled and 
this suite is for {{ShuffleBlockPusher}}, so no other change is required.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35671) Add Support in the ESS to serve merged shuffle block meta and data to executors

2021-06-07 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-35671:
--
Description: 
With push-based shuffle enabled, the reducers send 2 different requests to the 
ESS:

1. Request to fetch the metadata of the merged shuffle block.
 2. Requests to fetch the data of the merged shuffle block in chunks which are 
by default 2MB in size.
 The ESS should be able to serve these requests and this Jira targets all the 
changes in the network-common and network-shuffle modules to be able to support 
this.

  was:
With push-based shuffle enabled, the reducers send 2 different requests to the 
ESS:

1. Request to fetch the metadata of the merged shuffle block.
2. Requests to fetch the data of the merged shuffle block in chunks which are 
by default 2MB in size.
The ESS should be able to serve these requests and this Jira targets all the 
changes in the ESS to be able to support this.


> Add Support in the ESS to serve merged shuffle block meta and data to 
> executors
> ---
>
> Key: SPARK-35671
> URL: https://issues.apache.org/jira/browse/SPARK-35671
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> With push-based shuffle enabled, the reducers send 2 different requests to 
> the ESS:
> 1. Request to fetch the metadata of the merged shuffle block.
>  2. Requests to fetch the data of the merged shuffle block in chunks which 
> are by default 2MB in size.
>  The ESS should be able to serve these requests and this Jira targets all the 
> changes in the network-common and network-shuffle modules to be able to 
> support this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35671) Add Support in the ESS to serve merged shuffle block meta and data to executors

2021-06-07 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-35671:
-

 Summary: Add Support in the ESS to serve merged shuffle block meta 
and data to executors
 Key: SPARK-35671
 URL: https://issues.apache.org/jira/browse/SPARK-35671
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle
Affects Versions: 3.1.0
Reporter: Chandni Singh


With push-based shuffle enabled, the reducers send 2 different requests to the 
ESS:

1. Request to fetch the metadata of the merged shuffle block.
2. Requests to fetch the data of the merged shuffle block in chunks which are 
by default 2MB in size.
The ESS should be able to serve these requests and this Jira targets all the 
changes in the ESS to be able to support this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32922) Add support for ShuffleBlockFetcherIterator to read from merged shuffle partitions and to fallback to original shuffle blocks if encountering failures

2021-06-07 Thread Chandni Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358768#comment-17358768
 ] 

Chandni Singh commented on SPARK-32922:
---

Splitting the changes into ESS server side/client side changes as per the 
comment here: https://github.com/apache/spark/pull/32140#issuecomment-856099709

> Add support for ShuffleBlockFetcherIterator to read from merged shuffle 
> partitions and to fallback to original shuffle blocks if encountering failures
> --
>
> Key: SPARK-32922
> URL: https://issues.apache.org/jira/browse/SPARK-32922
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
>
> With the extended MapOutputTracker, the reducers can now get the task input 
> data from the merged shuffle partitions for more efficient shuffle data 
> fetch. The reducers should also be able to fallback to fetching the original 
> unmarked blocks if it encounters failures when fetching the merged shuffle 
> partitions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35546) Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way

2021-06-02 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-35546:
--
Summary: Enable push-based shuffle when multiple app attempts are enabled 
and manage concurrent access to the state in a better way   (was: Properly 
handle race conditions in RemoteBlockPushResolver to support push based shuffle 
with multiple app attempts enabled)

> Enable push-based shuffle when multiple app attempts are enabled and manage 
> concurrent access to the state in a better way 
> ---
>
> Key: SPARK-35546
> URL: https://issues.apache.org/jira/browse/SPARK-35546
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Ye Zhou
>Priority: Major
>
> In the current implementation of RemoteBlockPushResolver, two 
> ConcurrentHashmap are used to store #1 applicationId -> 
> mergedShuffleLocalDirPath #2 applicationId+attemptId+shuffleID -> 
> mergedShuffleParitionInfo. As there are four types of messages: 
> ExecutorRegister, PushBlocks, FinalizeShuffleMerge and ApplicationRemove, 
> will trigger different types of operations within these two hashmaps, it is 
> required to maintain strong consistency about the informations stored in 
> these two hashmaps. Otherwise, either there will be data 
> corruption/correctness issues or memory leak in shuffle server. 
> We should come up with systematic way to resolve this, other than spot fixing 
> the potential issues.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-31689) ShuffleBlockFetchIterator keeps localBlocks in its memory even though it never uses it

2021-06-02 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh resolved SPARK-31689.
---
Resolution: Incomplete

This will be addressed with SPARK-32922 where this change is required.

> ShuffleBlockFetchIterator keeps localBlocks in its memory even though it 
> never uses it
> --
>
> Key: SPARK-31689
> URL: https://issues.apache.org/jira/browse/SPARK-31689
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.4.5
>Reporter: Chandni Singh
>Priority: Minor
>
> The {{localBlocks}} is created and used in the {{initialize}} method of 
> ShuffleBlockFetchIterator but is never used after that. 
>  It can be local to the initialize method instead of being a field in the 
> {{ShuffleBlockFetchIterator}} instance. It holds on to memory until iterator 
> instance is alive which is unnecessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed

2021-06-01 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-34840:
--
Parent: SPARK-30602
Issue Type: Sub-task  (was: Bug)

> Fix cases of corruption in merged shuffle blocks that are pushed
> 
>
> Key: SPARK-34840
> URL: https://issues.apache.org/jira/browse/SPARK-34840
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Assignee: Chandni Singh
>Priority: Major
> Fix For: 3.1.2, 3.2.0
>
>
> The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and 
> merges them was introduced in 
> [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 
> scenarios where the merged blocks get corrupted:
>  # {{StreamCallback.onFailure()}} is called more than once. Initially we 
> assumed that the onFailure callback will be called just once per stream. 
> However, we observed that this is called twice when a client connection is 
> reset. When the client connection is reset then there are 2 events that get 
> triggered in this order.
>  * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. 
> {{StreamInterceptor.exceptionCaught()}} invokes 
> {{callback.onFailure(streamId, cause)}}. This is the first time 
> StreamCallback.onFailure() will be invoked.
>  * {{channelInactive}}. Since the channel closes, the {{channelInactive}} 
> event gets triggered which again is propagated to {{StreamInterceptor}}. 
> {{StreamInterceptor.channelInactive()}} invokes 
> {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the 
> second time StreamCallback.onFailure() will be invoked.
>  # The flag {{isWriting}} is set prematurely to true. This introduces an edge 
> case where a stream that is trying to merge a duplicate block (created 
> because of a speculative task) may interfere with an active stream if the 
> duplicate stream fails.
> Also adding additional changes that improve the code.
>  # Using positional writes all the time because this simplifies the code and 
> with microbenchmarking haven't seen any performance impact.
>  # Additional minor changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed

2021-03-23 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-34840:
-

 Summary: Fix cases of corruption in merged shuffle blocks that are 
pushed
 Key: SPARK-34840
 URL: https://issues.apache.org/jira/browse/SPARK-34840
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.1.0
Reporter: Chandni Singh


The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and 
merges them was introduced in 
[#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 
scenarios where the merged blocks get corrupted:
 # {{StreamCallback.onFailure()}} is called more than once. Initially we 
assumed that the onFailure callback will be called just once per stream. 
However, we observed that this is called twice when a client connection is 
reset. When the client connection is reset then there are 2 events that get 
triggered in this order.

 * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. 
{{StreamInterceptor.exceptionCaught()}} invokes {{callback.onFailure(streamId, 
cause)}}. This is the first time StreamCallback.onFailure() will be invoked.
 * {{channelInactive}}. Since the channel closes, the {{channelInactive}} event 
gets triggered which again is propagated to {{StreamInterceptor}}. 
{{StreamInterceptor.channelInactive()}} invokes {{callback.onFailure(streamId, 
new ClosedChannelException())}}. This is the second time 
StreamCallback.onFailure() will be invoked.

 # The flag {{isWriting}} is set prematurely to true. This introduces an edge 
case where a stream that is trying to merge a duplicate block (created because 
of a speculative task) may interfere with an active stream if the duplicate 
stream fails.

Also adding additional changes that improve the code.
 # Using positional writes all the time because this simplifies the code and 
with microbenchmarking haven't seen any performance impact.
 # Additional minor changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33665) Enable ShuffleBlockPusher to stop pushing blocks for a particular shuffle partition

2020-12-04 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-33665:
-

 Summary: Enable ShuffleBlockPusher to stop pushing blocks for a 
particular shuffle partition
 Key: SPARK-33665
 URL: https://issues.apache.org/jira/browse/SPARK-33665
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle
Affects Versions: 3.1.0
Reporter: Chandni Singh


{{ShuffleBlockPusher}}, which was introduced in SPARK-32917 stops pushing 
shuffle blocks for the entire shuffle when it receives "TOO Late" exception. 
However, with the change [https://github.com/apache/spark/pull/30433], there is 
also a need to stop pushing shuffle blocks for a particular reduce partition. 
Refer https://github.com/apache/spark/pull/30433#discussion_r533694433

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31689) ShuffleBlockFetchIterator keeps localBlocks in its memory even though it never uses it

2020-11-13 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-31689:
--
Description: 
The {{localBlocks}} is created and used in the {{initialize}} method of 
ShuffleBlockFetchIterator but is never used after that. 
 It can be local to the initialize method instead of being a field in the 
{{ShuffleBlockFetchIterator}} instance. It holds on to memory until iterator 
instance is alive which is unnecessary.

  was:
The {{localBlocks}} is created and used in the {{initialize}} method of 
ShuffleBlockFetchIterator but is never used after that. 
It can be local to the initialize method instead of being a field in the 
{{ShuffleBlockFetchIterator}} instance. It hold son to memory until iterator 
instance is alive which is unnecessary.


> ShuffleBlockFetchIterator keeps localBlocks in its memory even though it 
> never uses it
> --
>
> Key: SPARK-31689
> URL: https://issues.apache.org/jira/browse/SPARK-31689
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, Spark Core
>Affects Versions: 2.4.5
>Reporter: Chandni Singh
>Priority: Minor
>
> The {{localBlocks}} is created and used in the {{initialize}} method of 
> ShuffleBlockFetchIterator but is never used after that. 
>  It can be local to the initialize method instead of being a field in the 
> {{ShuffleBlockFetchIterator}} instance. It holds on to memory until iterator 
> instance is alive which is unnecessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33350) Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2020-11-04 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-33350:
--
Summary: Add support to DiskBlockManager to create merge directory and to 
get the local shuffle merged data  (was: Add support to DiskBlockManager to 
create merge directory and the ability to get the shuffle merged data)

> Add support to DiskBlockManager to create merge directory and to get the 
> local shuffle merged data
> --
>
> Key: SPARK-33350
> URL: https://issues.apache.org/jira/browse/SPARK-33350
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> DiskBlockManager should be able to create the {{merge_manager}} directory, 
> where the push-based merged shuffle files are written and also create 
> sub-dirs under it. 
> It should also be able to serve the local merged shuffle data/index/meta 
> files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33350) Add support to DiskBlockManager to create merge directory and the ability to get the shuffle merged data

2020-11-04 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-33350:
-

 Summary: Add support to DiskBlockManager to create merge directory 
and the ability to get the shuffle merged data
 Key: SPARK-33350
 URL: https://issues.apache.org/jira/browse/SPARK-33350
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle
Affects Versions: 3.1.0
Reporter: Chandni Singh


DiskBlockManager should be able to create the {{merge_manager}} directory, 
where the push-based merged shuffle files are written and also create sub-dirs 
under it. 

It should also be able to serve the local merged shuffle data/index/meta files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33331) Limit the number of pending blocks in memory and store blocks that collide

2020-11-04 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-1:
--
Description: 
This jira addresses the below two points:
 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately are 
stored in memory. The stream callback maintains a list of {{deferredBufs}}. 
When a block cannot be merged it is added to this list. Currently, there isn't 
a limit on the number of pending blocks. We can limit the number of pending 
blocks in memory. There has been a discussion around this here:
[https://github.com/apache/spark/pull/30062#discussion_r514026014]

2. When a stream doesn't get an opportunity to merge, then 
{{RemoteBlockPushResolver}} ignores the data from that stream. Another approach 
is to store the data of the stream in {{AppShufflePartitionInfo}} when it 
reaches the worst-case scenario. This may increase the memory usage of the 
shuffle service though. However, given a limit introduced with 1 we can try 
this out.
 More information can be found in this discussion:
 [https://github.com/apache/spark/pull/30062#discussion_r517524546]

  was:
This jira addresses the below two points:
 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately are 
stored in memory. The stream callback maintains a list of {{deferredBufs}}. 
When a block cannot be merged it is added to this list. Currently, there isn't 
a limit on the number of pending blocks. We can limit the number of pending 
blocks in memory. There has been a discussion around this here:
 [https://github.com/apache/spark/pull/30062#discussion_r514026014
]

2. When a stream doesn't get an opportunity to merge, then 
{{RemoteBlockPushResolver}} ignores the data from that stream. Another approach 
is to store the data of the stream in {{AppShufflePartitionInfo}} when it 
reaches the worst-case scenario. This may increase the memory usage of the 
shuffle service though. However, given a limit introduced with 1 we can try 
this out.
 More information can be found in this discussion:
 [https://github.com/apache/spark/pull/30062#discussion_r517524546]


> Limit the number of pending blocks in memory and store blocks that collide
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> This jira addresses the below two points:
>  1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately 
> are stored in memory. The stream callback maintains a list of 
> {{deferredBufs}}. When a block cannot be merged it is added to this list. 
> Currently, there isn't a limit on the number of pending blocks. We can limit 
> the number of pending blocks in memory. There has been a discussion around 
> this here:
> [https://github.com/apache/spark/pull/30062#discussion_r514026014]
> 2. When a stream doesn't get an opportunity to merge, then 
> {{RemoteBlockPushResolver}} ignores the data from that stream. Another 
> approach is to store the data of the stream in {{AppShufflePartitionInfo}} 
> when it reaches the worst-case scenario. This may increase the memory usage 
> of the shuffle service though. However, given a limit introduced with 1 we 
> can try this out.
>  More information can be found in this discussion:
>  [https://github.com/apache/spark/pull/30062#discussion_r517524546]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33331) Limit the number of pending blocks in memory and store blocks that collide

2020-11-04 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-1:
--
Description: 
This jira addresses the below two points:
 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately are 
stored in memory. The stream callback maintains a list of {{deferredBufs}}. 
When a block cannot be merged it is added to this list. Currently, there isn't 
a limit on the number of pending blocks. We can limit the number of pending 
blocks in memory. There has been a discussion around this here:
 [https://github.com/apache/spark/pull/30062#discussion_r514026014
]

2. When a stream doesn't get an opportunity to merge, then 
{{RemoteBlockPushResolver}} ignores the data from that stream. Another approach 
is to store the data of the stream in {{AppShufflePartitionInfo}} when it 
reaches the worst-case scenario. This may increase the memory usage of the 
shuffle service though. However, given a limit introduced with 1 we can try 
this out.
 More information can be found in this discussion:
 [https://github.com/apache/spark/pull/30062#discussion_r517524546]

  was:
1. In {{RemoteBlockPushResolver}},  bytes that cannot be merged immediately are 
stored in memory. The stream callback maintains a list of {{deferredBufs}}. 
When a block cannot be merged it is added to this list. Currently, there isn't 
a limit on the number of pending blocks. There has been a discussion around 
this here:
https://github.com/apache/spark/pull/30062#discussion_r514026014

2. When a stream doesn't get an opportunity to merge, then 
{{RemoteBlockPushResolver}} ignores the data from that stream. Another approach 
is to store the data of the stream in {{AppShufflePartitionInfo}} when it 
reaches the worst-case scenario. This may increase the memory usage of the 
shuffle service though. However, given a limit introduced with 1 we can try 
this out.



> Limit the number of pending blocks in memory and store blocks that collide
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> This jira addresses the below two points:
>  1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately 
> are stored in memory. The stream callback maintains a list of 
> {{deferredBufs}}. When a block cannot be merged it is added to this list. 
> Currently, there isn't a limit on the number of pending blocks. We can limit 
> the number of pending blocks in memory. There has been a discussion around 
> this here:
>  [https://github.com/apache/spark/pull/30062#discussion_r514026014
> ]
> 2. When a stream doesn't get an opportunity to merge, then 
> {{RemoteBlockPushResolver}} ignores the data from that stream. Another 
> approach is to store the data of the stream in {{AppShufflePartitionInfo}} 
> when it reaches the worst-case scenario. This may increase the memory usage 
> of the shuffle service though. However, given a limit introduced with 1 we 
> can try this out.
>  More information can be found in this discussion:
>  [https://github.com/apache/spark/pull/30062#discussion_r517524546]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33331) Limit the number of pending blocks in memory and store blocks that collide

2020-11-04 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-1:
--
Summary: Limit the number of pending blocks in memory and store blocks that 
collide  (was: Limit the number of pending blocks in memory when 
RemoteBlockPushResolver defers a block)

> Limit the number of pending blocks in memory and store blocks that collide
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> 1. In {{RemoteBlockPushResolver}},  bytes that cannot be merged immediately 
> are stored in memory. The stream callback maintains a list of 
> {{deferredBufs}}. When a block cannot be merged it is added to this list. 
> Currently, there isn't a limit on the number of pending blocks. There has 
> been a discussion around this here:
> https://github.com/apache/spark/pull/30062#discussion_r514026014
> 2. When a stream doesn't get an opportunity to merge, then 
> {{RemoteBlockPushResolver}} ignores the data from that stream. Another 
> approach is to store the data of the stream in {{AppShufflePartitionInfo}} 
> when it reaches the worst-case scenario. This may increase the memory usage 
> of the shuffle service though. However, given a limit introduced with 1 we 
> can try this out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33331) Limit the number of pending blocks in memory when RemoteBlockPushResolver defers a block

2020-11-04 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-1:
--
Description: 
1. In {{RemoteBlockPushResolver}},  bytes that cannot be merged immediately are 
stored in memory. The stream callback maintains a list of {{deferredBufs}}. 
When a block cannot be merged it is added to this list. Currently, there isn't 
a limit on the number of pending blocks. There has been a discussion around 
this here:
https://github.com/apache/spark/pull/30062#discussion_r514026014

2. When a stream doesn't get an opportunity to merge, then 
{{RemoteBlockPushResolver}} ignores the data from that stream. Another approach 
is to store the data of the stream in {{AppShufflePartitionInfo}} when it 
reaches the worst-case scenario. This may increase the memory usage of the 
shuffle service though. However, given a limit introduced with 1 we can try 
this out.


  was:
This is to address the comment here:
https://github.com/apache/spark/pull/30062#discussion_r514026014


> Limit the number of pending blocks in memory when RemoteBlockPushResolver 
> defers a block
> 
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> 1. In {{RemoteBlockPushResolver}},  bytes that cannot be merged immediately 
> are stored in memory. The stream callback maintains a list of 
> {{deferredBufs}}. When a block cannot be merged it is added to this list. 
> Currently, there isn't a limit on the number of pending blocks. There has 
> been a discussion around this here:
> https://github.com/apache/spark/pull/30062#discussion_r514026014
> 2. When a stream doesn't get an opportunity to merge, then 
> {{RemoteBlockPushResolver}} ignores the data from that stream. Another 
> approach is to store the data of the stream in {{AppShufflePartitionInfo}} 
> when it reaches the worst-case scenario. This may increase the memory usage 
> of the shuffle service though. However, given a limit introduced with 1 we 
> can try this out.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33331) Limit the number of pending blocks in memory when RemoteBlockPushResolver defers a block

2020-11-03 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-1:
--
Summary: Limit the number of pending blocks in memory when 
RemoteBlockPushResolver defers a block  (was: Limit the number of pending 
blocks in memory when {{RemoteBlockPushResolver}} defers a block)

> Limit the number of pending blocks in memory when RemoteBlockPushResolver 
> defers a block
> 
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Sub-task
>  Components: Shuffle
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>
> This is to address the comment here:
> https://github.com/apache/spark/pull/30062#discussion_r514026014



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33331) Limit the number of pending blocks in memory when {{RemoteBlockPushResolver}} defers a block

2020-11-03 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-1:
-

 Summary: Limit the number of pending blocks in memory when 
{{RemoteBlockPushResolver}} defers a block
 Key: SPARK-1
 URL: https://issues.apache.org/jira/browse/SPARK-1
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle
Affects Versions: 3.1.0
Reporter: Chandni Singh


This is to address the comment here:
https://github.com/apache/spark/pull/30062#discussion_r514026014



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33236) Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2020-10-25 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-33236:
-

 Summary: Enable Push-based shuffle service to store state in NM 
level DB for work preserving restart
 Key: SPARK-33236
 URL: https://issues.apache.org/jira/browse/SPARK-33236
 Project: Spark
  Issue Type: Sub-task
  Components: Shuffle
Affects Versions: 3.1.0
Reporter: Chandni Singh






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33235) Push-based Shuffle Phase 2 Tasks

2020-10-25 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-33235:
--
Description: 
This is the parent jira for the phase 2 or follow-up tasks for supporting 
Push-based shuffle. Refer SPARK-30602.


  was:
In a large deployment of a Spark compute infrastructure, Spark shuffle is 
becoming a potential scaling bottleneck and a source of inefficiency in the 
cluster. When doing Spark on YARN for a large-scale deployment, people usually 
enable Spark external shuffle service and store the intermediate shuffle files 
on HDD. Because the number of blocks generated for a particular shuffle grows 
quadratically compared to the size of shuffled data (# mappers and reducers 
grows linearly with the size of shuffled data, but # blocks is # mappers * # 
reducers), one general trend we have observed is that the more data a Spark 
application processes, the smaller the block size becomes. In a few production 
clusters we have seen, the average shuffle block size is only 10s of KBs. 
Because of the inefficiency of performing random reads on HDD for small amount 
of data, the overall efficiency of the Spark external shuffle services serving 
the shuffle blocks degrades as we see an increasing # of Spark applications 
processing an increasing amount of data. In addition, because Spark external 
shuffle service is a shared service in a multi-tenancy cluster, the 
inefficiency with one Spark application could propagate to other applications 
as well.

In this ticket, we propose a solution to improve Spark shuffle efficiency in 
above mentioned environments with push-based shuffle. With push-based shuffle, 
shuffle is performed at the end of mappers and blocks get pre-merged and move 
towards reducers. In our prototype implementation, we have seen significant 
efficiency improvements when performing large shuffles. We take a Spark-native 
approach to achieve this, i.e., extending Spark’s existing shuffle netty 
protocol, and the behaviors of Spark mappers, reducers and drivers. This way, 
we can bring the benefits of more efficient shuffle in Spark without incurring 
the dependency or overhead of either specialized storage layer or external 
infrastructure pieces.

 

Link to dev mailing list discussion: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html


> Push-based Shuffle Phase 2 Tasks
> 
>
> Key: SPARK-33235
> URL: https://issues.apache.org/jira/browse/SPARK-33235
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Chandni Singh
>Priority: Major
>  Labels: release-notes
>
> This is the parent jira for the phase 2 or follow-up tasks for supporting 
> Push-based shuffle. Refer SPARK-30602.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33235) Push-based Shuffle Phase 2 Tasks

2020-10-25 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-33235:
-

 Summary: Push-based Shuffle Phase 2 Tasks
 Key: SPARK-33235
 URL: https://issues.apache.org/jira/browse/SPARK-33235
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle, Spark Core
Affects Versions: 3.1.0
Reporter: Chandni Singh


In a large deployment of a Spark compute infrastructure, Spark shuffle is 
becoming a potential scaling bottleneck and a source of inefficiency in the 
cluster. When doing Spark on YARN for a large-scale deployment, people usually 
enable Spark external shuffle service and store the intermediate shuffle files 
on HDD. Because the number of blocks generated for a particular shuffle grows 
quadratically compared to the size of shuffled data (# mappers and reducers 
grows linearly with the size of shuffled data, but # blocks is # mappers * # 
reducers), one general trend we have observed is that the more data a Spark 
application processes, the smaller the block size becomes. In a few production 
clusters we have seen, the average shuffle block size is only 10s of KBs. 
Because of the inefficiency of performing random reads on HDD for small amount 
of data, the overall efficiency of the Spark external shuffle services serving 
the shuffle blocks degrades as we see an increasing # of Spark applications 
processing an increasing amount of data. In addition, because Spark external 
shuffle service is a shared service in a multi-tenancy cluster, the 
inefficiency with one Spark application could propagate to other applications 
as well.

In this ticket, we propose a solution to improve Spark shuffle efficiency in 
above mentioned environments with push-based shuffle. With push-based shuffle, 
shuffle is performed at the end of mappers and blocks get pre-merged and move 
towards reducers. In our prototype implementation, we have seen significant 
efficiency improvements when performing large shuffles. We take a Spark-native 
approach to achieve this, i.e., extending Spark’s existing shuffle netty 
protocol, and the behaviors of Spark mappers, reducers and drivers. This way, 
we can bring the benefits of more efficient shuffle in Spark without incurring 
the dependency or overhead of either specialized storage layer or external 
infrastructure pieces.

 

Link to dev mailing list discussion: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency

2020-09-19 Thread Chandni Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198679#comment-17198679
 ] 

Chandni Singh commented on SPARK-30602:
---

The reference PR for the consolidated change:
https://github.com/apache/spark/pull/29808

> SPIP: Support push-based shuffle to improve shuffle efficiency
> --
>
> Key: SPARK-30602
> URL: https://issues.apache.org/jira/browse/SPARK-30602
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle, Spark Core
>Affects Versions: 3.1.0
>Reporter: Min Shen
>Priority: Major
> Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, 
> vldb_magnet_final.pdf
>
>
> In a large deployment of a Spark compute infrastructure, Spark shuffle is 
> becoming a potential scaling bottleneck and a source of inefficiency in the 
> cluster. When doing Spark on YARN for a large-scale deployment, people 
> usually enable Spark external shuffle service and store the intermediate 
> shuffle files on HDD. Because the number of blocks generated for a particular 
> shuffle grows quadratically compared to the size of shuffled data (# mappers 
> and reducers grows linearly with the size of shuffled data, but # blocks is # 
> mappers * # reducers), one general trend we have observed is that the more 
> data a Spark application processes, the smaller the block size becomes. In a 
> few production clusters we have seen, the average shuffle block size is only 
> 10s of KBs. Because of the inefficiency of performing random reads on HDD for 
> small amount of data, the overall efficiency of the Spark external shuffle 
> services serving the shuffle blocks degrades as we see an increasing # of 
> Spark applications processing an increasing amount of data. In addition, 
> because Spark external shuffle service is a shared service in a multi-tenancy 
> cluster, the inefficiency with one Spark application could propagate to other 
> applications as well.
> In this ticket, we propose a solution to improve Spark shuffle efficiency in 
> above mentioned environments with push-based shuffle. With push-based 
> shuffle, shuffle is performed at the end of mappers and blocks get pre-merged 
> and move towards reducers. In our prototype implementation, we have seen 
> significant efficiency improvements when performing large shuffles. We take a 
> Spark-native approach to achieve this, i.e., extending Spark’s existing 
> shuffle netty protocol, and the behaviors of Spark mappers, reducers and 
> drivers. This way, we can bring the benefits of more efficient shuffle in 
> Spark without incurring the dependency or overhead of either specialized 
> storage layer or external infrastructure pieces.
>  
> Link to dev mailing list discussion: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32664) Getting local shuffle block clutters the executor logs

2020-08-20 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-32664:
-

 Summary: Getting local shuffle block clutters the executor logs
 Key: SPARK-32664
 URL: https://issues.apache.org/jira/browse/SPARK-32664
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.1.0
Reporter: Chandni Singh


The below log statement in {{BlockManager.getLocalBlockData}} should be at 
debug level
{code:java}
logInfo(s"Getting local shuffle block ${blockId}")
{code}
Currently, the executor logs get cluttered with this
{code:java}
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6103_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6132_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6137_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6312_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6323_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6402_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6413_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6694_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6709_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6753_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6822_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6894_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_6913_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_7052_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_7073_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_7167_4964
20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block 
shuffle_0_7194_4964
{code}

This was added with SPARK-20629.
cc. [~holden]




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32663) TransportClient getting closed when there are outstanding requests to the server

2020-08-19 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-32663:
--
Description: 
The implementation of {{removeBlocks}} and {{getHostLocalDirs}} in 
{{ExternalBlockStoreClient}} closes the client after processing a response in 
the callback. 

This is a cached client which will be re-used for other responses. There could 
be other outstanding request to the shuffle service, so it should not be closed 
after processing a response. 
Seems like this is a bug introduced with SPARK-27651 and SPARK-27677. 

The older methods  {{registerWithShuffleServer}} and {{fetchBlocks}} didn't 
close the client.

cc [~attilapiros] [~vanzin] [~mridulm80]

  was:
The implementation of {{removeBlocks}} and {{getHostLocalDirs}} in 
{{ExternalBlockStoreClient}} closes the client after processing a response in 
the callback. 

This is a cached client which will be re-used for other responses. There could 
be other outstanding request to the shuffle service, so it should not be closed 
after processing a response. 
Seems like this is a bug introduced with SPARK-27651 and SPARK-27677. 

The older methods  {{registerWithShuffleServer}} and {{fetchBlocks}} didn't 
close the client.

cc [~attilapiros] [~vanzin] [~mridul]


> TransportClient getting closed when there are outstanding requests to the 
> server
> 
>
> Key: SPARK-32663
> URL: https://issues.apache.org/jira/browse/SPARK-32663
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Chandni Singh
>Priority: Major
>
> The implementation of {{removeBlocks}} and {{getHostLocalDirs}} in 
> {{ExternalBlockStoreClient}} closes the client after processing a response in 
> the callback. 
> This is a cached client which will be re-used for other responses. There 
> could be other outstanding request to the shuffle service, so it should not 
> be closed after processing a response. 
> Seems like this is a bug introduced with SPARK-27651 and SPARK-27677. 
> The older methods  {{registerWithShuffleServer}} and {{fetchBlocks}} didn't 
> close the client.
> cc [~attilapiros] [~vanzin] [~mridulm80]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32663) TransportClient getting closed when there are outstanding requests to the server

2020-08-19 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-32663:
-

 Summary: TransportClient getting closed when there are outstanding 
requests to the server
 Key: SPARK-32663
 URL: https://issues.apache.org/jira/browse/SPARK-32663
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.0.0
Reporter: Chandni Singh


The implementation of {{removeBlocks}} and {{getHostLocalDirs}} in 
{{ExternalBlockStoreClient}} closes the client after processing a response in 
the callback. 

This is a cached client which will be re-used for other responses. There could 
be other outstanding request to the shuffle service, so it should not be closed 
after processing a response. 
Seems like this is a bug introduced with SPARK-27651 and SPARK-27677. 

The older methods  {{registerWithShuffleServer}} and {{fetchBlocks}} didn't 
close the client.

cc [~attilapiros] [~vanzin] [~mridul]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31689) ShuffleBlockFetchIterator keeps localBlocks in its memory even though it never uses it

2020-05-12 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-31689:
-

 Summary: ShuffleBlockFetchIterator keeps localBlocks in its memory 
even though it never uses it
 Key: SPARK-31689
 URL: https://issues.apache.org/jira/browse/SPARK-31689
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 2.4.5
Reporter: Chandni Singh


The {{localBlocks}} is created and used in the {{initialize}} method of 
ShuffleBlockFetchIterator but is never used after that. 
It can be local to the initialize method instead of being a field in the 
{{ShuffleBlockFetchIterator}} instance. It hold son to memory until iterator 
instance is alive which is unnecessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30512) Use a dedicated boss event group loop in the netty pipeline for external shuffle service

2020-01-16 Thread Chandni Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017447#comment-17017447
 ] 

Chandni Singh commented on SPARK-30512:
---

[https://github.com/apache/spark/pull/27240]

> Use a dedicated boss event group loop in the netty pipeline for external 
> shuffle service
> 
>
> Key: SPARK-30512
> URL: https://issues.apache.org/jira/browse/SPARK-30512
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Chandni Singh
>Priority: Major
>
> We have been seeing a large number of SASL authentication (RPC requests) 
> timing out with the external shuffle service.
>  The issue and all the analysis we did is described here:
>  [https://github.com/netty/netty/issues/9890]
> I added a {{LoggingHandler}} to netty pipeline and realized that even the 
> channel registration is delayed by 30 seconds. 
>  In the Spark External Shuffle service, the boss event group and the worker 
> event group are same which is causing this delay.
> {code:java}
> EventLoopGroup bossGroup =
>   NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
> conf.getModuleName() + "-server");
> EventLoopGroup workerGroup = bossGroup;
> bootstrap = new ServerBootstrap()
>   .group(bossGroup, workerGroup)
>   .channel(NettyUtils.getServerChannelClass(ioMode))
>   .option(ChannelOption.ALLOCATOR, allocator)
>   .childOption(ChannelOption.ALLOCATOR, allocator);
> {code}
> When the load at the shuffle service increases, since the worker threads are 
> busy with existing channels, registering new channels gets delayed.
> The fix is simple. I created a dedicated boss thread event loop group with 1 
> thread.
> {code:java}
> EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
>   conf.getModuleName() + "-boss");
> EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, 
> conf.serverThreads(),
> conf.getModuleName() + "-server");
> bootstrap = new ServerBootstrap()
>   .group(bossGroup, workerGroup)
>   .channel(NettyUtils.getServerChannelClass(ioMode))
>   .option(ChannelOption.ALLOCATOR, allocator)
> {code}
> This fixed the issue.
>  We just need 1 thread in the boss group because there is only a single 
> server bootstrap.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-30512) Use a dedicated boss event group loop in the netty pipeline for external shuffle service

2020-01-16 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-30512:
--
Comment: was deleted

(was: [https://github.com/apache/spark/pull/27240])

> Use a dedicated boss event group loop in the netty pipeline for external 
> shuffle service
> 
>
> Key: SPARK-30512
> URL: https://issues.apache.org/jira/browse/SPARK-30512
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Chandni Singh
>Priority: Major
>
> We have been seeing a large number of SASL authentication (RPC requests) 
> timing out with the external shuffle service.
>  The issue and all the analysis we did is described here:
>  [https://github.com/netty/netty/issues/9890]
> I added a {{LoggingHandler}} to netty pipeline and realized that even the 
> channel registration is delayed by 30 seconds. 
>  In the Spark External Shuffle service, the boss event group and the worker 
> event group are same which is causing this delay.
> {code:java}
> EventLoopGroup bossGroup =
>   NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
> conf.getModuleName() + "-server");
> EventLoopGroup workerGroup = bossGroup;
> bootstrap = new ServerBootstrap()
>   .group(bossGroup, workerGroup)
>   .channel(NettyUtils.getServerChannelClass(ioMode))
>   .option(ChannelOption.ALLOCATOR, allocator)
>   .childOption(ChannelOption.ALLOCATOR, allocator);
> {code}
> When the load at the shuffle service increases, since the worker threads are 
> busy with existing channels, registering new channels gets delayed.
> The fix is simple. I created a dedicated boss thread event loop group with 1 
> thread.
> {code:java}
> EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
>   conf.getModuleName() + "-boss");
> EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, 
> conf.serverThreads(),
> conf.getModuleName() + "-server");
> bootstrap = new ServerBootstrap()
>   .group(bossGroup, workerGroup)
>   .channel(NettyUtils.getServerChannelClass(ioMode))
>   .option(ChannelOption.ALLOCATOR, allocator)
> {code}
> This fixed the issue.
>  We just need 1 thread in the boss group because there is only a single 
> server bootstrap.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30512) Use a dedicated boss event group loop in the netty pipeline for external shuffle service

2020-01-14 Thread Chandni Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17015362#comment-17015362
 ] 

Chandni Singh commented on SPARK-30512:
---

Please assign the issue to me so I can open up a PR.

> Use a dedicated boss event group loop in the netty pipeline for external 
> shuffle service
> 
>
> Key: SPARK-30512
> URL: https://issues.apache.org/jira/browse/SPARK-30512
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Chandni Singh
>Priority: Major
>
> We have been seeing a large number of SASL authentication (RPC requests) 
> timing out with the external shuffle service.
>  The issue and all the analysis we did is described here:
>  [https://github.com/netty/netty/issues/9890]
> I added a {{LoggingHandler}} to netty pipeline and realized that even the 
> channel registration is delayed by 30 seconds. 
>  In the Spark External Shuffle service, the boss event group and the worker 
> event group are same which is causing this delay.
> {code:java}
> EventLoopGroup bossGroup =
>   NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
> conf.getModuleName() + "-server");
> EventLoopGroup workerGroup = bossGroup;
> bootstrap = new ServerBootstrap()
>   .group(bossGroup, workerGroup)
>   .channel(NettyUtils.getServerChannelClass(ioMode))
>   .option(ChannelOption.ALLOCATOR, allocator)
>   .childOption(ChannelOption.ALLOCATOR, allocator);
> {code}
> When the load at the shuffle service increases, since the worker threads are 
> busy with existing channels, registering new channels gets delayed.
> The fix is simple. I created a dedicated boss thread event loop group with 1 
> thread.
> {code:java}
> EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
>   conf.getModuleName() + "-boss");
> EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, 
> conf.serverThreads(),
> conf.getModuleName() + "-server");
> bootstrap = new ServerBootstrap()
>   .group(bossGroup, workerGroup)
>   .channel(NettyUtils.getServerChannelClass(ioMode))
>   .option(ChannelOption.ALLOCATOR, allocator)
> {code}
> This fixed the issue.
>  We just need 1 thread in the boss group because there is only a single 
> server bootstrap.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30512) Use a dedicated boss event group loop in the netty pipeline for external shuffle service

2020-01-14 Thread Chandni Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chandni Singh updated SPARK-30512:
--
Description: 
We have been seeing a large number of SASL authentication (RPC requests) timing 
out with the external shuffle service.
 The issue and all the analysis we did is described here:
 [https://github.com/netty/netty/issues/9890]

I added a {{LoggingHandler}} to netty pipeline and realized that even the 
channel registration is delayed by 30 seconds. 
 In the Spark External Shuffle service, the boss event group and the worker 
event group are same which is causing this delay.
{code:java}
EventLoopGroup bossGroup =
  NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;

bootstrap = new ServerBootstrap()
  .group(bossGroup, workerGroup)
  .channel(NettyUtils.getServerChannelClass(ioMode))
  .option(ChannelOption.ALLOCATOR, allocator)
  .childOption(ChannelOption.ALLOCATOR, allocator);
{code}
When the load at the shuffle service increases, since the worker threads are 
busy with existing channels, registering new channels gets delayed.

The fix is simple. I created a dedicated boss thread event loop group with 1 
thread.
{code:java}
EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
  conf.getModuleName() + "-boss");
EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, 
conf.serverThreads(),
conf.getModuleName() + "-server");

bootstrap = new ServerBootstrap()
  .group(bossGroup, workerGroup)
  .channel(NettyUtils.getServerChannelClass(ioMode))
  .option(ChannelOption.ALLOCATOR, allocator)
{code}
This fixed the issue.
 We just need 1 thread in the boss group because there is only a single server 
bootstrap.

 

  was:
We have been seeing a large number of SASL authentication (RPC requests) timing 
out with the external shuffle service.
 The issue and all the analysis we did is described here:
 [https://github.com/netty/netty/issues/9890]

I added a {{LoggingHandler}} to netty pipeline and realized that even the 
channel registration is delayed by 30 seconds. 
 In the Spark External Shuffle service, the boss event group and the worker 
event group are same which is causing this delay.
{code:java}
EventLoopGroup bossGroup =
  NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;

bootstrap = new ServerBootstrap()
  .group(bossGroup, workerGroup)
  .channel(NettyUtils.getServerChannelClass(ioMode))
  .option(ChannelOption.ALLOCATOR, allocator)
  .childOption(ChannelOption.ALLOCATOR, allocator);
{code}
When the load at the shuffle service increases, since the worker threads are 
busy with existing channels, registering new channels gets delayed.

The fix is simple. I created a dedicated boss thread event loop group with 1 
thread.
{code:java}
EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
  conf.getModuleName() + "-boss");
EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, 
conf.serverThreads(),
conf.getModuleName() + "-server");

bootstrap = new ServerBootstrap()
  .group(bossGroup, workerGroup)
  .channel(NettyUtils.getServerChannelClass(ioMode))
  .option(ChannelOption.ALLOCATOR, allocator)
{code}
This fixed the issue.
 We just need 1 thread in the boss group because there is only a single server 
bootstrap.

Please assign the issue to me so I can open up a PR.


> Use a dedicated boss event group loop in the netty pipeline for external 
> shuffle service
> 
>
> Key: SPARK-30512
> URL: https://issues.apache.org/jira/browse/SPARK-30512
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle
>Affects Versions: 3.0.0
>Reporter: Chandni Singh
>Priority: Major
>
> We have been seeing a large number of SASL authentication (RPC requests) 
> timing out with the external shuffle service.
>  The issue and all the analysis we did is described here:
>  [https://github.com/netty/netty/issues/9890]
> I added a {{LoggingHandler}} to netty pipeline and realized that even the 
> channel registration is delayed by 30 seconds. 
>  In the Spark External Shuffle service, the boss event group and the worker 
> event group are same which is causing this delay.
> {code:java}
> EventLoopGroup bossGroup =
>   NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
> conf.getModuleName() + "-server");
> EventLoopGroup workerGroup = bossGroup;
> bootstrap = new ServerBootstrap()
>   .group(bossGroup, workerGroup)
>   .channel(NettyUtils.getServerChannelClass(ioMode))
>   

[jira] [Created] (SPARK-30512) Use a dedicated boss event group loop in the netty pipeline for external shuffle service

2020-01-14 Thread Chandni Singh (Jira)
Chandni Singh created SPARK-30512:
-

 Summary: Use a dedicated boss event group loop in the netty 
pipeline for external shuffle service
 Key: SPARK-30512
 URL: https://issues.apache.org/jira/browse/SPARK-30512
 Project: Spark
  Issue Type: Bug
  Components: Shuffle
Affects Versions: 3.0.0
Reporter: Chandni Singh


We have been seeing a large number of SASL authentication (RPC requests) timing 
out with the external shuffle service.
 The issue and all the analysis we did is described here:
 [https://github.com/netty/netty/issues/9890]

I added a {{LoggingHandler}} to netty pipeline and realized that even the 
channel registration is delayed by 30 seconds. 
 In the Spark External Shuffle service, the boss event group and the worker 
event group are same which is causing this delay.
{code:java}
EventLoopGroup bossGroup =
  NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;

bootstrap = new ServerBootstrap()
  .group(bossGroup, workerGroup)
  .channel(NettyUtils.getServerChannelClass(ioMode))
  .option(ChannelOption.ALLOCATOR, allocator)
  .childOption(ChannelOption.ALLOCATOR, allocator);
{code}
When the load at the shuffle service increases, since the worker threads are 
busy with existing channels, registering new channels gets delayed.

The fix is simple. I created a dedicated boss thread event loop group with 1 
thread.
{code:java}
EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,
  conf.getModuleName() + "-boss");
EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, 
conf.serverThreads(),
conf.getModuleName() + "-server");

bootstrap = new ServerBootstrap()
  .group(bossGroup, workerGroup)
  .channel(NettyUtils.getServerChannelClass(ioMode))
  .option(ChannelOption.ALLOCATOR, allocator)
{code}
This fixed the issue.
 We just need 1 thread in the boss group because there is only a single server 
bootstrap.

Please assign the issue to me so I can open up a PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org