[jira] [Commented] (SPARK-44215) Client receives zero number of chunks in merge meta response which doesn't trigger fallback to unmerged blocks
[ https://issues.apache.org/jira/browse/SPARK-44215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740023#comment-17740023 ] Chandni Singh commented on SPARK-44215: --- PR to backport the change to 3.3 https://github.com/apache/spark/pull/41859 > Client receives zero number of chunks in merge meta response which doesn't > trigger fallback to unmerged blocks > -- > > Key: SPARK-44215 > URL: https://issues.apache.org/jira/browse/SPARK-44215 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Assignee: Chandni Singh >Priority: Major > Fix For: 3.5.0, 3.4.2 > > > We still see instances of the server returning 0 {{numChunks}} in > {{mergedMetaResponse}} which causes the executor to fail with > {{ArithmeticException}}. > {code} > java.lang.ArithmeticException: / by zero > at > org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) > at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) > {code} > Here the executor doesn't fallback to fetch un-merged blocks and this also > doesn't result in a {{FetchFailure}}. So, the application fails. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44215) Client receives zero number of chunks in merge meta response which doesn't trigger fallback to unmerged blocks
Chandni Singh created SPARK-44215: - Summary: Client receives zero number of chunks in merge meta response which doesn't trigger fallback to unmerged blocks Key: SPARK-44215 URL: https://issues.apache.org/jira/browse/SPARK-44215 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 3.2.0 Reporter: Chandni Singh We still see instances of the server returning 0 {{numChunks}} in {{mergedMetaResponse}} which causes the executor to fail with {{ArithmeticException}}. {code} java.lang.ArithmeticException: / by zero at org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:128) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:1047) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:90) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) {code} Here the executor doesn't fallback to fetch un-merged blocks and this also doesn't result in a {{FetchFailure}}. So, the application fails. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-43179) Add option for applications to control saving of metadata in the External Shuffle Service LevelDB
[ https://issues.apache.org/jira/browse/SPARK-43179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730307#comment-17730307 ] Chandni Singh edited comment on SPARK-43179 at 6/7/23 11:30 PM: Found a bug so re-opening the issue. Introduced a bug that impacts recovery of applications {code:java} ERROR org.apache.spark.network.server.TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5764589675121231159 java.lang.RuntimeException: javax.security.sasl.SaslException: DIGEST-MD5: digest response format violation. Mismatched response. at org.sparkproject.guava.base.Throwables.propagate(Throwables.java:160) at org.apache.spark.network.sasl.SparkSaslServer.response(SparkSaslServer.java:121) at org.apache.spark.network.sasl.SaslRpcHandler.doAuthChallenge(SaslRpcHandler.java:94) at org.apache.spark.network.crypto.AuthRpcHandler.doAuthChallenge(AuthRpcHandler.java:81) at org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:59) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.sparkproject.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.sparkproject.io.netty.channel.DefaultChannelPipeline {code} was (Author: csingh): Found a bug so re-opening the issue. > Add option for applications to control saving of metadata in the External > Shuffle Service LevelDB > - > > Key: SPARK-43179 > URL: https://issues.apache.org/jira/browse/SPARK-43179 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.4.0 >Reporter: Chandni Singh >Assignee: Chandni Singh >Priority: Major > Fix For: 3.5.0 > > > Currently, the External Shuffle Service stores application metadata in > LevelDB. This is necessary to enable the shuffle server to resume serving > shuffle data for an application whose executors registered before the > NodeManager restarts. However, the metadata includes the application secret, > which is stored in LevelDB without encryption. This is a potential security > risk, particularly for applications with high security requirements. While > filesystem access control lists (ACLs) can help protect keys and > certificates, they may not be sufficient for some use cases. In response, we > have decided not to store metadata for these high-security applications in > LevelDB. As a result, these applications may experience more failures in the > event of a node restart, but we
[jira] [Reopened] (SPARK-43179) Add option for applications to control saving of metadata in the External Shuffle Service LevelDB
[ https://issues.apache.org/jira/browse/SPARK-43179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh reopened SPARK-43179: --- Found a bug so re-opening the issue. > Add option for applications to control saving of metadata in the External > Shuffle Service LevelDB > - > > Key: SPARK-43179 > URL: https://issues.apache.org/jira/browse/SPARK-43179 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.4.0 >Reporter: Chandni Singh >Assignee: Chandni Singh >Priority: Major > Fix For: 3.5.0 > > > Currently, the External Shuffle Service stores application metadata in > LevelDB. This is necessary to enable the shuffle server to resume serving > shuffle data for an application whose executors registered before the > NodeManager restarts. However, the metadata includes the application secret, > which is stored in LevelDB without encryption. This is a potential security > risk, particularly for applications with high security requirements. While > filesystem access control lists (ACLs) can help protect keys and > certificates, they may not be sufficient for some use cases. In response, we > have decided not to store metadata for these high-security applications in > LevelDB. As a result, these applications may experience more failures in the > event of a node restart, but we believe this trade-off is acceptable given > the increased security risk. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43583) When encryption is enabled on the External Shuffle Service, then processing of push meta requests throws NPE
Chandni Singh created SPARK-43583: - Summary: When encryption is enabled on the External Shuffle Service, then processing of push meta requests throws NPE Key: SPARK-43583 URL: https://issues.apache.org/jira/browse/SPARK-43583 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 3.2.0 Reporter: Chandni Singh After enabling support for over-the-wire encryption for spark shuffle services, the meta requests for push-merged blocks fail with this error: {code:java} java.lang.RuntimeException: java.lang.NullPointerException at org.apache.spark.network.server.AbstractAuthRpcHandler.getMergedBlockMetaReqHandler(AbstractAuthRpcHandler.java:110) at org.apache.spark.network.crypto.AuthRpcHandler.getMergedBlockMetaReqHandler(AuthRpcHandler.java:144) at org.apache.spark.network.server.TransportRequestHandler.processMergedBlockMetaRequest(TransportRequestHandler.java:275) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:117) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at org.sparkproject.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.sparkproject.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43179) Add option for applications to control saving of metadata in the External Shuffle Service LevelDB
[ https://issues.apache.org/jira/browse/SPARK-43179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-43179: -- Summary: Add option for applications to control saving of metadata in the External Shuffle Service LevelDB (was: Add option for applications to control saving of metadata in External Shuffle Service LevelDB) > Add option for applications to control saving of metadata in the External > Shuffle Service LevelDB > - > > Key: SPARK-43179 > URL: https://issues.apache.org/jira/browse/SPARK-43179 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.4.0 >Reporter: Chandni Singh >Priority: Major > > Currently, the External Shuffle Service stores application metadata in > LevelDB. This is necessary to enable the shuffle server to resume serving > shuffle data for an application whose executors registered before the > NodeManager restarts. However, the metadata includes the application secret, > which is stored in LevelDB without encryption. This is a potential security > risk, particularly for applications with high security requirements. While > filesystem access control lists (ACLs) can help protect keys and > certificates, they may not be sufficient for some use cases. In response, we > have decided not to store metadata for these high-security applications in > LevelDB. As a result, these applications may experience more failures in the > event of a node restart, but we believe this trade-off is acceptable given > the increased security risk. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43179) Add option for applications to control saving of metadata in External Shuffle Service LevelDB
[ https://issues.apache.org/jira/browse/SPARK-43179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-43179: -- Summary: Add option for applications to control saving of metadata in External Shuffle Service LevelDB (was: Allow applications to control whether their metadata gets saved by the shuffle server in the db) > Add option for applications to control saving of metadata in External Shuffle > Service LevelDB > - > > Key: SPARK-43179 > URL: https://issues.apache.org/jira/browse/SPARK-43179 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 3.4.0 >Reporter: Chandni Singh >Priority: Major > > Currently, the External Shuffle Service stores application metadata in > LevelDB. This is necessary to enable the shuffle server to resume serving > shuffle data for an application whose executors registered before the > NodeManager restarts. However, the metadata includes the application secret, > which is stored in LevelDB without encryption. This is a potential security > risk, particularly for applications with high security requirements. While > filesystem access control lists (ACLs) can help protect keys and > certificates, they may not be sufficient for some use cases. In response, we > have decided not to store metadata for these high-security applications in > LevelDB. As a result, these applications may experience more failures in the > event of a node restart, but we believe this trade-off is acceptable given > the increased security risk. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43179) Allow applications to control whether their metadata gets saved by the shuffle server in the db
Chandni Singh created SPARK-43179: - Summary: Allow applications to control whether their metadata gets saved by the shuffle server in the db Key: SPARK-43179 URL: https://issues.apache.org/jira/browse/SPARK-43179 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 3.4.0 Reporter: Chandni Singh Currently, the External Shuffle Service stores application metadata in LevelDB. This is necessary to enable the shuffle server to resume serving shuffle data for an application whose executors registered before the NodeManager restarts. However, the metadata includes the application secret, which is stored in LevelDB without encryption. This is a potential security risk, particularly for applications with high security requirements. While filesystem access control lists (ACLs) can help protect keys and certificates, they may not be sufficient for some use cases. In response, we have decided not to store metadata for these high-security applications in LevelDB. As a result, these applications may experience more failures in the event of a node restart, but we believe this trade-off is acceptable given the increased security risk. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-42834) Divided by zero occurs in PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse
[ https://issues.apache.org/jira/browse/SPARK-42834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701908#comment-17701908 ] Chandni Singh commented on SPARK-42834: --- We don't expect the `numChunks` to be zero or `bitmaps` to be empty. There was a bug in 3.2.0 which was fixed with https://issues.apache.org/jira/browse/SPARK-37675 Can you please check if you have this fix? > Divided by zero occurs in > PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse > > > Key: SPARK-42834 > URL: https://issues.apache.org/jira/browse/SPARK-42834 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.2.0 >Reporter: Li Ying >Priority: Major > > {color:#22}Sometimes when run a SQL job with push based shuffle, > exception occurs as below. It seems that there’s no element in the bitmaps > which stores merge chunk meta. {color} > {color:#22}Is it a bug that we should not createChunkBlockInfos when > bitmaps is empty or the bitmaps should never be empty here ?{color} > > {code:java} > Caused by: java.lang.ArithmeticException: / by zero > at > org.apache.spark.storage.PushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(PushBasedFetchHelper.scala:117) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:980) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84) > {code} > related code: > {code:java} > def createChunkBlockInfosFromMetaResponse( > shuffleId: Int, > shuffleMergeId: Int, > reduceId: Int, > blockSize: Long, > bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = { > val approxChunkSize = blockSize / bitmaps.length > val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]() > for (i <- bitmaps.indices) { > val blockChunkId = ShuffleBlockChunkId(shuffleId, shuffleMergeId, > reduceId, i) > chunksMetaMap.put(blockChunkId, bitmaps(i)) > logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize") > blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID)) > } > blocksToFetch > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-42817) Spark driver logs are filled with Initializing service data for shuffle service using name
[ https://issues.apache.org/jira/browse/SPARK-42817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17700803#comment-17700803 ] Chandni Singh commented on SPARK-42817: --- Created PR https://github.com/apache/spark/pull/40448 > Spark driver logs are filled with Initializing service data for shuffle > service using name > -- > > Key: SPARK-42817 > URL: https://issues.apache.org/jira/browse/SPARK-42817 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Major > > With SPARK-34828, we added the ability to make the shuffle service name > configurable and we added a log > [here|https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala#L118] > that will log the shuffle service name. However, this log is printed in the > driver logs whenever there is new executor launched and pollutes the log. > {code} > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > {code} > We can just log this once in the driver. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-42817) Spark driver logs are filled with Initializing service data for shuffle service using name
[ https://issues.apache.org/jira/browse/SPARK-42817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-42817: -- Description: With SPARK-34828, we added the ability to make the shuffle service name configurable and we added a log [here|https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala#L118] that will log the shuffle service name. However, this log is printed in the driver logs whenever there is new executor launched and pollutes the log. {code} 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for shuffle service using name 'spark_shuffle_311' {code} We can just log this once in the driver. > Spark driver logs are filled with Initializing service data for shuffle > service using name > -- > > Key: SPARK-42817 > URL: https://issues.apache.org/jira/browse/SPARK-42817 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Major > > With SPARK-34828, we added the ability to make the shuffle service name > configurable and we added a log > [here|https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala#L118] > that will log the shuffle service name. However, this log is printed in the > driver logs whenever there is new executor launched and pollutes the log. > {code} > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using name 'spark_shuffle_311' > 22/08/03 20:42:07 INFO ExecutorRunnable: Initializing service data for > shuffle service using
[jira] [Created] (SPARK-42817) Spark driver logs are filled with Initializing service data for shuffle service using name
Chandni Singh created SPARK-42817: - Summary: Spark driver logs are filled with Initializing service data for shuffle service using name Key: SPARK-42817 URL: https://issues.apache.org/jira/browse/SPARK-42817 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.0 Reporter: Chandni Singh -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39647) Block push fails with java.lang.IllegalArgumentException: Active local dirs list has not been updated by any executor registration even when the NodeManager hasn't been
[ https://issues.apache.org/jira/browse/SPARK-39647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-39647: -- Summary: Block push fails with java.lang.IllegalArgumentException: Active local dirs list has not been updated by any executor registration even when the NodeManager hasn't been restarted (was: Block push fails with java.lang.IllegalArgumentException: Active local dirs list has not been updated by any executor registration even when NodeManager hasn't been restarted) > Block push fails with java.lang.IllegalArgumentException: Active local dirs > list has not been updated by any executor registration even when the > NodeManager hasn't been restarted > -- > > Key: SPARK-39647 > URL: https://issues.apache.org/jira/browse/SPARK-39647 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Major > > We saw these exceptions during block push: > {code:java} > 22/06/24 13:29:14 ERROR RetryingBlockFetcher: Failed to fetch block > shuffle_170_568_174, and will not retry (0 retries) > org.apache.spark.network.shuffle.BlockPushException: > !application_1653753500486_3193550shuffle_170_568_174java.lang.IllegalArgumentException: > Active local dirs list has not been updated by any executor registration > at > org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:92) > at > org.apache.spark.network.shuffle.RemoteBlockPushResolver.getActiveLocalDirs(RemoteBlockPushResolver.java:300) > at > org.apache.spark.network.shuffle.RemoteBlockPushResolver.getFile(RemoteBlockPushResolver.java:290) > at > org.apache.spark.network.shuffle.RemoteBlockPushResolver.getMergedShuffleFile(RemoteBlockPushResolver.java:312) > at > org.apache.spark.network.shuffle.RemoteBlockPushResolver.lambda$getOrCreateAppShufflePartitionInfo$1(RemoteBlockPushResolver.java:168) > 22/06/24 13:29:14 WARN UnsafeShuffleWriter: Pushing block shuffle_170_568_174 > to BlockManagerId(, node-x, 7337, None) failed. > {code} > Note: The NodeManager on node-x (node against which this exception was seen) > was not restarted. > The reason this happened is because the executor registers the block manager > with {{BlockManagerMaster}} before it registers with the ESS. In push-based > shuffle, a block manager is selected by the driver as a merger for the > shuffle push. However, the ESS on that node can successfully merge the block > only if it has received the metadata about merged directories from the local > executor (sent when the local executor registers with the ESS). If this local > executor registration is delayed, but the ESS host got picked up as a merger > then it will fail to merge the blocks pushed to it which is what happened > here. > The local executor on node-x is executor 754 and the block manager > registration happened at 13:28:11 > {code:java} > 22/06/24 13:28:11 INFO ExecutorAllocationManager: New executor 754 has > registered (new total is 1200) > 22/06/24 13:28:11 INFO BlockManagerMasterEndpoint: Registering block manager > node-x:16747 with 2004.6 MB RAM, BlockManagerId(754, node-x, 16747, None) > {code} > The application got registered with shuffle server at node-x at 13:29:40 > {code:java} > 2022-06-24 13:29:40,343 INFO > org.apache.spark.network.shuffle.RemoteBlockPushResolver: Updated the active > local dirs [/grid/i/tmp/yarn/, /grid/g/tmp/yarn/, /grid/b/tmp/yarn/, > /grid/e/tmp/yarn/, /grid/h/tmp/yarn/, /grid/f/tmp/yarn/, /grid/d/tmp/yarn/, > /grid/c/tmp/yarn/] for application application_1653753500486_3193550 > {code} > node-x was selected as a merger by the driver after 13:28:11 and when the > executors started pushing to it, all those pushes failed until 13:29:40 > We can fix by having the executor register with ESS before it registers the > block manager with the {{BlockManagerMaster}} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39647) Block push fails with java.lang.IllegalArgumentException: Active local dirs list has not been updated by any executor registration even when NodeManager hasn't been rest
Chandni Singh created SPARK-39647: - Summary: Block push fails with java.lang.IllegalArgumentException: Active local dirs list has not been updated by any executor registration even when NodeManager hasn't been restarted Key: SPARK-39647 URL: https://issues.apache.org/jira/browse/SPARK-39647 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 3.2.0 Reporter: Chandni Singh We saw these exceptions during block push: {code:java} 22/06/24 13:29:14 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_170_568_174, and will not retry (0 retries) org.apache.spark.network.shuffle.BlockPushException: !application_1653753500486_3193550shuffle_170_568_174java.lang.IllegalArgumentException: Active local dirs list has not been updated by any executor registration at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:92) at org.apache.spark.network.shuffle.RemoteBlockPushResolver.getActiveLocalDirs(RemoteBlockPushResolver.java:300) at org.apache.spark.network.shuffle.RemoteBlockPushResolver.getFile(RemoteBlockPushResolver.java:290) at org.apache.spark.network.shuffle.RemoteBlockPushResolver.getMergedShuffleFile(RemoteBlockPushResolver.java:312) at org.apache.spark.network.shuffle.RemoteBlockPushResolver.lambda$getOrCreateAppShufflePartitionInfo$1(RemoteBlockPushResolver.java:168) 22/06/24 13:29:14 WARN UnsafeShuffleWriter: Pushing block shuffle_170_568_174 to BlockManagerId(, node-x, 7337, None) failed. {code} Note: The NodeManager on node-x (node against which this exception was seen) was not restarted. The reason this happened is because the executor registers the block manager with {{BlockManagerMaster}} before it registers with the ESS. In push-based shuffle, a block manager is selected by the driver as a merger for the shuffle push. However, the ESS on that node can successfully merge the block only if it has received the metadata about merged directories from the local executor (sent when the local executor registers with the ESS). If this local executor registration is delayed, but the ESS host got picked up as a merger then it will fail to merge the blocks pushed to it which is what happened here. The local executor on node-x is executor 754 and the block manager registration happened at 13:28:11 {code:java} 22/06/24 13:28:11 INFO ExecutorAllocationManager: New executor 754 has registered (new total is 1200) 22/06/24 13:28:11 INFO BlockManagerMasterEndpoint: Registering block manager node-x:16747 with 2004.6 MB RAM, BlockManagerId(754, node-x, 16747, None) {code} The application got registered with shuffle server at node-x at 13:29:40 {code:java} 2022-06-24 13:29:40,343 INFO org.apache.spark.network.shuffle.RemoteBlockPushResolver: Updated the active local dirs [/grid/i/tmp/yarn/, /grid/g/tmp/yarn/, /grid/b/tmp/yarn/, /grid/e/tmp/yarn/, /grid/h/tmp/yarn/, /grid/f/tmp/yarn/, /grid/d/tmp/yarn/, /grid/c/tmp/yarn/] for application application_1653753500486_3193550 {code} node-x was selected as a merger by the driver after 13:28:11 and when the executors started pushing to it, all those pushes failed until 13:29:40 We can fix by having the executor register with ESS before it registers the block manager with the {{BlockManagerMaster}} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-39471) ExternalShuffleService port is extracted from the confs everytime a block manager gets registered
[ https://issues.apache.org/jira/browse/SPARK-39471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh resolved SPARK-39471. --- Resolution: Not A Problem Created by mistake. This is not a problem in the master > ExternalShuffleService port is extracted from the confs everytime a block > manager gets registered > - > > Key: SPARK-39471 > URL: https://issues.apache.org/jira/browse/SPARK-39471 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Major > > We are extracting the shuffle service port from the spark and hadoop > configuration with every blockmanager registeration. This could be avoided by > using the cached external service port. > Also, we should avoid calling addMerger when push-based shuffle is disabled. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39471) ExternalShuffleService port is extracted from the confs everytime a block manager gets registered
Chandni Singh created SPARK-39471: - Summary: ExternalShuffleService port is extracted from the confs everytime a block manager gets registered Key: SPARK-39471 URL: https://issues.apache.org/jira/browse/SPARK-39471 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.0 Reporter: Chandni Singh We are extracting the shuffle service port from the spark and hadoop configuration with every blockmanager registeration. This could be avoided by using the cached external service port. Also, we should avoid calling addMerger when push-based shuffle is disabled. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38973) When push-based shuffle is enabled, a stage may not complete when retried
[ https://issues.apache.org/jira/browse/SPARK-38973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-38973: -- Description: With push-based shuffle enabled and adaptive merge finalization, there are scenarios where a re-attempt of ShuffleMapStage may not complete. With Adaptive Merge Finalization, a stage may be triggered for finalization when it is in the below state: # The stage is *not* running ({*}not{*} in the _running_ set of the DAGScheduler) - had failed or canceled or waiting, and # The stage has no pending partitions (all the tasks completed at-least once). For such a stage when the finalization completes, the stage will still not be marked as {_}mergeFinalized{_}. The stage of the stage will be: * _stage.shuffleDependency.mergeFinalized = false_ * _stage.shuffleDependency.getFinalizeTask = finalizeTask_ * Merged statuses of the state are unregistered When the stage is resubmitted, the newer attempt of the stage will never complete even though its tasks may be completed. This is because the newer attempt of the stage will have {_}shuffleMergeEnabled = true{_}, since with the previous attempt the stage was never marked as {_}mergedFinalized{_}, and the _finalizeTask_ is present (from finalization attempt for previous stage attempt). So, when all the tasks of the newer attempt complete, then these conditions will be true: * stage will be running * There will be no pending partitions since all the tasks completed * _stage.shuffleDependency.shuffleMergeEnabled = true_ * _stage.shuffleDependency.shuffleMergeFinalized = false_ * _stage.shuffleDependency.getFinalizeTask_ is not empty This leads the DAGScheduler to try scheduling finalization and not trigger the completion of the Stage. However because of the last condition it never even schedules the finalization and the stage never completes. was: With push-based shuffle enabled and adaptive merge finalization, there are scenarios where a re-attempt of ShuffleMapStage may not complete. With Adaptive Merge Finalization, a stage may be triggered for finalization when it is in the below state: # The stage is *not* running ({*}not{*} in the _running_ set of the DAGScheduler) - had failed or canceled or waiting, and # The stage has no pending partitions - completed. For such a stage when the finalization completes, the stage will still not be marked as {_}mergeFinalized{_}. The stage of the stage will be: * _stage.shuffleDependency.mergeFinalized = false_ * _stage.shuffleDependency.getFinalizeTask = finalizeTask_ * Merged statuses of the state are unregistered When the stage is resubmitted, the newer attempt of the stage will never complete even though its tasks may be completed. This is because the newer attempt of the stage will have {_}shuffleMergeEnabled = true{_}, since with the previous attempt the stage was never marked as {_}mergedFinalized{_}, and the _finalizeTask_ is present (from finalization attempt for previous stage attempt). So, when all the tasks of the newer attempt complete, then these conditions will be true: * stage will be running * There will be no pending partitions since all the tasks completed * _stage.shuffleDependency.shuffleMergeEnabled = true_ * _stage.shuffleDependency.shuffleMergeFinalized = false_ * _stage.shuffleDependency.getFinalizeTask_ is not empty This leads the DAGScheduler to try scheduling finalization and not trigger the completion of the Stage. However because of the last condition it never even schedules the finalization and the stage never completes. > When push-based shuffle is enabled, a stage may not complete when retried > - > > Key: SPARK-38973 > URL: https://issues.apache.org/jira/browse/SPARK-38973 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Major > > With push-based shuffle enabled and adaptive merge finalization, there are > scenarios where a re-attempt of ShuffleMapStage may not complete. > With Adaptive Merge Finalization, a stage may be triggered for finalization > when it is in the below state: > # The stage is *not* running ({*}not{*} in the _running_ set of the > DAGScheduler) - had failed or canceled or waiting, and > # The stage has no pending partitions (all the tasks completed at-least > once). > For such a stage when the finalization completes, the stage will still not be > marked as {_}mergeFinalized{_}. > The stage of the stage will be: > * _stage.shuffleDependency.mergeFinalized = false_ > * _stage.shuffleDependency.getFinalizeTask = finalizeTask_ > * Merged statuses of the state are unregistered > > When the stage is resubmitted, the newer attempt of the
[jira] [Updated] (SPARK-38973) When push-based shuffle is enabled, a stage may not complete when retried
[ https://issues.apache.org/jira/browse/SPARK-38973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-38973: -- Description: With push-based shuffle enabled and adaptive merge finalization, there are scenarios where a re-attempt of ShuffleMapStage may not complete. With Adaptive Merge Finalization, a stage may be triggered for finalization when it is in the below state: # The stage is *not* running ({*}not{*} in the _running_ set of the DAGScheduler) - had failed or canceled or waiting, and # The stage has no pending partitions - completed. For such a stage when the finalization completes, the stage will still not be marked as {_}mergeFinalized{_}. The stage of the stage will be: * _stage.shuffleDependency.mergeFinalized = false_ * _stage.shuffleDependency.getFinalizeTask = finalizeTask_ * Merged statuses of the state are unregistered When the stage is resubmitted, the newer attempt of the stage will never complete even though its tasks may be completed. This is because the newer attempt of the stage will have {_}shuffleMergeEnabled = true{_}, since with the previous attempt the stage was never marked as {_}mergedFinalized{_}, and the _finalizeTask_ is present (from finalization attempt for previous stage attempt). So, when all the tasks of the newer attempt complete, then these conditions will be true: * stage will be running * There will be no pending partitions since all the tasks completed * _stage.shuffleDependency.shuffleMergeEnabled = true_ * _stage.shuffleDependency.shuffleMergeFinalized = false_ * _stage.shuffleDependency.getFinalizeTask_ is not empty This leads the DAGScheduler to try scheduling finalization and not trigger the completion of the Stage. However because of the last condition it never even schedules the finalization and the stage never completes. was: With push-based shuffle enabled and adaptive merge finalization, there are scenarios where a re-attempt of ShuffleMapStage may not complete. With Adaptive Merge Finalization, a stage may be triggered for finalization before it completes - and can subsequently fail or be canceled. The stage may be in the below state when it is finalized: # The stage is *not* running ({*}not{*} in the _running_ set of the DAGScheduler) - had failed or canceled, and # The stage has no pending partitions - completed. For such a stage when the finalization completes, the stage will still not be marked as {_}mergeFinalized{_}. The stage of the stage will be: * _stage.shuffleDependency.mergeFinalized = false_ * _stage.shuffleDependency.getFinalizeTask = finalizeTask_ * Merged statuses of the state are unregistered When the stage is resubmitted, the newer attempt of the stage will never complete even though its tasks may be completed. This is because the newer attempt of the stage will have \{_}shuffleMergeEnabled = true{_}, since with the previous attempt the stage was never marked as {_}mergedFinalized{_}, and the _finalizeTask_ is present (from finalization attempt for previous stage attempt). So, when all the tasks of the newer attempt complete, then these conditions will be true: * stage will be running * There will be no pending partitions since all the tasks completed * _stage.shuffleDependency.shuffleMergeEnabled = true_ * _stage.shuffleDependency.shuffleMergeFinalized = false_ * _stage.shuffleDependency.getFinalizeTask_ is not empty This leads the DAGScheduler to try scheduling finalization and not trigger the completion of the Stage. However because of the last condition it never even schedules the finalization and the stage never completes. > When push-based shuffle is enabled, a stage may not complete when retried > - > > Key: SPARK-38973 > URL: https://issues.apache.org/jira/browse/SPARK-38973 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Major > > With push-based shuffle enabled and adaptive merge finalization, there are > scenarios where a re-attempt of ShuffleMapStage may not complete. > With Adaptive Merge Finalization, a stage may be triggered for finalization > when it is in the below state: > # The stage is *not* running ({*}not{*} in the _running_ set of the > DAGScheduler) - had failed or canceled or waiting, and > # The stage has no pending partitions - completed. > For such a stage when the finalization completes, the stage will still not be > marked as {_}mergeFinalized{_}. > The stage of the stage will be: > * _stage.shuffleDependency.mergeFinalized = false_ > * _stage.shuffleDependency.getFinalizeTask = finalizeTask_ > * Merged statuses of the state are unregistered > > When the stage is resubmitted,
[jira] [Created] (SPARK-38973) When push-based shuffle is enabled, a stage may not complete when retried
Chandni Singh created SPARK-38973: - Summary: When push-based shuffle is enabled, a stage may not complete when retried Key: SPARK-38973 URL: https://issues.apache.org/jira/browse/SPARK-38973 Project: Spark Issue Type: Bug Components: Shuffle, Spark Core Affects Versions: 3.2.0 Reporter: Chandni Singh With push-based shuffle enabled and adaptive merge finalization, there are scenarios where a re-attempt of ShuffleMapStage may not complete. With Adaptive Merge Finalization, a stage may be triggered for finalization before it completes - and can subsequently fail or be canceled. The stage may be in the below state when it is finalized: # The stage is *not* running ({*}not{*} in the _running_ set of the DAGScheduler) - had failed or canceled, and # The stage has no pending partitions - completed. For such a stage when the finalization completes, the stage will still not be marked as {_}mergeFinalized{_}. The stage of the stage will be: * _stage.shuffleDependency.mergeFinalized = false_ * _stage.shuffleDependency.getFinalizeTask = finalizeTask_ * Merged statuses of the state are unregistered When the stage is resubmitted, the newer attempt of the stage will never complete even though its tasks may be completed. This is because the newer attempt of the stage will have \{_}shuffleMergeEnabled = true{_}, since with the previous attempt the stage was never marked as {_}mergedFinalized{_}, and the _finalizeTask_ is present (from finalization attempt for previous stage attempt). So, when all the tasks of the newer attempt complete, then these conditions will be true: * stage will be running * There will be no pending partitions since all the tasks completed * _stage.shuffleDependency.shuffleMergeEnabled = true_ * _stage.shuffleDependency.shuffleMergeFinalized = false_ * _stage.shuffleDependency.getFinalizeTask_ is not empty This leads the DAGScheduler to try scheduling finalization and not trigger the completion of the Stage. However because of the last condition it never even schedules the finalization and the stage never completes. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37675) Push-based merge finalization bugs in the RemoteBlockPushResolver
[ https://issues.apache.org/jira/browse/SPARK-37675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-37675: -- Description: We identified 3 issues in the handling of merge finalization requests in the RemoteBlockPushResolver: 1. Empty merge data If the shuffle gets finalized while a reducer partition is still receiving its first block, when merger finalizes that partition, we will end up with no data in the files - as it gets truncated to the last good position (which will be 0 in this case). Even though no data exists for the reducer - we still add it to result (merged reducerIds). 2. Overwriting of the merged data file of a reduce partition after it is finalized This is a more involved issue where some specific set of situations must occur, and starts with how our check for a {{too late block}} is done [here|https://github.com/apache/spark/blob/50758ab1a3d6a5f73a2419149a1420d103930f77/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L180]. The example below gives more details, but in a nutshell we have the following for a DETERMINATE shuffle: # Merge starts, blocks are accepted. # Merge is finalized. ** Files closed, status reported to driver, appShuffleInfo.shuffles cleaned up. # Late block push from an executor received. ** Request for a reducer for which merger never received a data until then - so no on-disk files ** Our check does not catch this case - we end up (re-) starting merge. # Executor could now push blocks for reducers which were finalized earlier. ** Files are truncated. # Reads will see inconsistent state due to the ongoing writes. Explaining this with an example with for a DETERMINATE shuffleId 1, shuffleMergeId 0, and reduce partitions 100 and 200: # shufflePush_1_0_0_100 is received by the RemoteBlockPushResolver. ## No meta information existed for shuffle 1 so shuffle service creates AppShuffleMergePartitionsInfo for shuffle 1 and shuffleMerge 0 to start merge. ## Merge starts with RemoteBlockPushResolver and it creates the data file for the merger request shuffleMerged_$APP_ID_1_0_100.data (along with index/meta files) # FinalizeShuffleMerge message for shuffleId 1 and shuffleMerged 0 is received by RemoteBlockPushResolver. In a thread safe manner: ## AppShuffleMergePartitionsInfo for shuffle 1 is removed from the map in memory. ## shuffleMerged_$APP_ID_1_0_100.data/index/meta files are closed. ## Driver is informed that partition 100 of shuffleId 1/mergeId 0 was merged. # shufflePush_1_0_0_200 is received by the RemoteBlockPushResolver. ## A new AppShuffleMergePartitionsInfo is added since: ### There is no AppShuffleMergePartitionsInfo for shuffle 1/merged id 0 - as it was removed during finalization, and ### The merger had never received data for partition 200 until then. ## With this, shuffleMerged…200.data is created, and on that merger, merge for shuffleId 1/mergeId 0 starts again. # shufflePush_1_0_5_100 is received by the RemoteBlockPushResolver. We randomize the order of pushes, so late pushes from an executor can end up pushing reducer 200 followed by data for reducer 100. ## AppShuffleMergePartitionsInfo was created for shuffle 1 and shuffleMerged 0 in 3-1 which doesn’t have the reduce id 100, the data/index/meta files for these partitions will be recreated. Reference code. 3. Throwing exception in the finalization of a shuffle for which the shuffle server didn't receive any blocks. For very small stages and with low minCompletedPushRatio/minShuffleSizeToWait, the driver can initiate the finalization of a shuffle right away. The shuffle server may not receive any push blocks and so there will not be a {{AppShuffleMergePartitionsInfo}} instance corresponding to the shuffle in the state. In this case, we should mark the shuffle as finalized and return empty results. > Push-based merge finalization bugs in the RemoteBlockPushResolver > - > > Key: SPARK-37675 > URL: https://issues.apache.org/jira/browse/SPARK-37675 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.2.0 >Reporter: Cheng Pan >Priority: Major > > We identified 3 issues in the handling of merge finalization requests in the > RemoteBlockPushResolver: > 1. Empty merge data > If the shuffle gets finalized while a reducer partition is still receiving > its first block, when merger finalizes that partition, we will end up with no > data in the files - as it gets truncated to the last good position (which > will be 0 in this case). > Even though no data exists for the reducer - we still add it to result > (merged reducerIds). > 2. Overwriting of the merged data file of a reduce partition after it is > finalized > This is a more
[jira] [Updated] (SPARK-37675) Push-based merge finalization bugs in the RemoteBlockPushResolver
[ https://issues.apache.org/jira/browse/SPARK-37675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-37675: -- Summary: Push-based merge finalization bugs in the RemoteBlockPushResolver (was: Return PushMergedRemoteMetaFailedFetchResult if no available push-merged block) > Push-based merge finalization bugs in the RemoteBlockPushResolver > - > > Key: SPARK-37675 > URL: https://issues.apache.org/jira/browse/SPARK-37675 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.2.0 >Reporter: Cheng Pan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38005) Support cleaning up merged shuffle files and state from external shuffle service
[ https://issues.apache.org/jira/browse/SPARK-38005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-38005: -- Parent: SPARK-33235 Issue Type: Sub-task (was: Improvement) > Support cleaning up merged shuffle files and state from external shuffle > service > > > Key: SPARK-38005 > URL: https://issues.apache.org/jira/browse/SPARK-38005 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Major > > Currently merged shuffle files and state is not cleaned up until an > application ends. SPARK-37618 handles the cleanup of regular shuffle files. > This jira will address cleaning up of merged shuffle files/state. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38005) Support cleaning up merged shuffle files and state from external shuffle service
[ https://issues.apache.org/jira/browse/SPARK-38005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-38005: -- Description: Currently merged shuffle files and state is not cleaned up until an application ends. SPARK-37618 handles the cleanup of regular shuffle files. This jira will address cleaning up of merged shuffle files/state. (was: Currently merged shuffle files and state is not cleaned up until an application ends. But shuffle files will still stick around until an application completes. Dynamic allocation is commonly used for long runnin) > Support cleaning up merged shuffle files and state from external shuffle > service > > > Key: SPARK-38005 > URL: https://issues.apache.org/jira/browse/SPARK-38005 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Major > > Currently merged shuffle files and state is not cleaned up until an > application ends. SPARK-37618 handles the cleanup of regular shuffle files. > This jira will address cleaning up of merged shuffle files/state. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38005) Support cleaning up merged shuffle files and state from external shuffle service
[ https://issues.apache.org/jira/browse/SPARK-38005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-38005: -- Description: Currently merged shuffle files and state is not cleaned up until an application ends. But shuffle files will still stick around until an application completes. Dynamic allocation is commonly used for long runnin was: Currently shuffle data is not cleaned up when an external shuffle service is used and the associated executor has been deallocated before the shuffle is cleaned up. Shuffle data is only cleaned up once the application ends. There have been various issues filed for this: https://issues.apache.org/jira/browse/SPARK-26020 https://issues.apache.org/jira/browse/SPARK-17233 https://issues.apache.org/jira/browse/SPARK-4236 But shuffle files will still stick around until an application completes. Dynamic allocation is commonly used for long running jobs (such as structured streaming), so any long running jobs with a large shuffle involved will eventually fill up local disk space. The shuffle service already supports cleaning up shuffle service persisted RDDs, so it should be able to support cleaning up shuffle blocks as well once the shuffle is removed by the ContextCleaner. The current alternative is to use shuffle tracking instead of an external shuffle service, but this is less optimal from a resource perspective as all executors must be kept alive until the shuffle has been fully consumed and cleaned up (and with the default GC interval being 30 minutes this can waste a lot of time with executors held onto but not doing anything). > Support cleaning up merged shuffle files and state from external shuffle > service > > > Key: SPARK-38005 > URL: https://issues.apache.org/jira/browse/SPARK-38005 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Major > > Currently merged shuffle files and state is not cleaned up until an > application ends. > But shuffle files will still stick around until an application completes. > Dynamic allocation is commonly used for long runnin -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38005) Support cleaning up merged shuffle files and state from external shuffle service
Chandni Singh created SPARK-38005: - Summary: Support cleaning up merged shuffle files and state from external shuffle service Key: SPARK-38005 URL: https://issues.apache.org/jira/browse/SPARK-38005 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 3.2.0 Reporter: Chandni Singh Currently shuffle data is not cleaned up when an external shuffle service is used and the associated executor has been deallocated before the shuffle is cleaned up. Shuffle data is only cleaned up once the application ends. There have been various issues filed for this: https://issues.apache.org/jira/browse/SPARK-26020 https://issues.apache.org/jira/browse/SPARK-17233 https://issues.apache.org/jira/browse/SPARK-4236 But shuffle files will still stick around until an application completes. Dynamic allocation is commonly used for long running jobs (such as structured streaming), so any long running jobs with a large shuffle involved will eventually fill up local disk space. The shuffle service already supports cleaning up shuffle service persisted RDDs, so it should be able to support cleaning up shuffle blocks as well once the shuffle is removed by the ContextCleaner. The current alternative is to use shuffle tracking instead of an external shuffle service, but this is less optimal from a resource perspective as all executors must be kept alive until the shuffle has been fully consumed and cleaned up (and with the default GC interval being 30 minutes this can waste a lot of time with executors held onto but not doing anything). -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36389) Revert the change that accepts negative mapId
[ https://issues.apache.org/jira/browse/SPARK-36389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-36389: -- Description: With SPARK-32922, we added a change that {{ShuffleBlockId}} can have a negative mapId. This was to support push-based shuffle where {{-1}} as mapId indicated a push-merged block. However with SPARK-32923, a different type of {{BlockId}} was introduced - {{ShuffleMergedId}}, but reverting the change to {{ShuffleBlockId}} was missed. (was: With SPARK-32922, we added a change that {{ShuffleBlockId}} can have a negative mapId. This was to support push-based shuffle where {{-1}} as mapId indicated a push-merged block. However with SPARK-32923, a different type of {{BlockId}} was introduce - {{ShuffleMergedId}}, but reverting the change to {{ShuffleBlockId}} was missed. ) > Revert the change that accepts negative mapId > - > > Key: SPARK-36389 > URL: https://issues.apache.org/jira/browse/SPARK-36389 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Minor > > With SPARK-32922, we added a change that {{ShuffleBlockId}} can have a > negative mapId. This was to support push-based shuffle where {{-1}} as mapId > indicated a push-merged block. However with SPARK-32923, a different type of > {{BlockId}} was introduced - {{ShuffleMergedId}}, but reverting the change to > {{ShuffleBlockId}} was missed. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36389) Revert the change that accepts negative mapId in ShuffleBlockId
[ https://issues.apache.org/jira/browse/SPARK-36389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-36389: -- Summary: Revert the change that accepts negative mapId in ShuffleBlockId (was: Revert the change that accepts negative mapId) > Revert the change that accepts negative mapId in ShuffleBlockId > --- > > Key: SPARK-36389 > URL: https://issues.apache.org/jira/browse/SPARK-36389 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.2.0 >Reporter: Chandni Singh >Priority: Minor > > With SPARK-32922, we added a change that {{ShuffleBlockId}} can have a > negative mapId. This was to support push-based shuffle where {{-1}} as mapId > indicated a push-merged block. However with SPARK-32923, a different type of > {{BlockId}} was introduced - {{ShuffleMergedId}}, but reverting the change to > {{ShuffleBlockId}} was missed. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36389) Revert the change that accepts negative mapId
Chandni Singh created SPARK-36389: - Summary: Revert the change that accepts negative mapId Key: SPARK-36389 URL: https://issues.apache.org/jira/browse/SPARK-36389 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 3.2.0 Reporter: Chandni Singh With SPARK-32922, we added a change that {{ShuffleBlockId}} can have a negative mapId. This was to support push-based shuffle where {{-1}} as mapId indicated a push-merged block. However with SPARK-32923, a different type of {{BlockId}} was introduce - {{ShuffleMergedId}}, but reverting the change to {{ShuffleBlockId}} was missed. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36255) FileNotFoundException from the shuffle push can cause the executor to terminate
[ https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-36255: -- Description: Once the shuffle is cleaned up by the {{ContextCleaner}}, the shuffle files are deleted by the executors. In this case, the push of the shuffle data by the executors can throw {{FileNotFoundException}}. When this exception is thrown from the {{shuffle-block-push-thread}}, it causes the executor to fail. This is because of the default uncaught exception handler for Spark daemon threads which terminates the executor when there are uncaught exceptions for the daemon threads. {code:java} 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[block-push-thread-1,5,main] java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190} at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Error in opening FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190} at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89) at org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294) at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270) at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191) at org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ... 2 more Caused by: java.io.FileNotFoundException: **/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data (No such file or directory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.(RandomAccessFile.java:243) at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62) {code} We can address the issue by handling "FileNotFound" exceptions in the push threads and netty threads by stopping the push when {{FileNotFound}} is encountered. was: When the shuffle files are cleaned up by the executors once a job in a Spark application completes, the push of the shuffle data by the executor can throw FileNotFound exception. When this exception is thrown from the {{shuffle-block-push-thread}}, it causes the executor to fail. This is because of the default uncaught exception handler for Spark daemon threads which terminates the executor when there are uncaught exceptions for the daemon threads. {code:java} 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[block-push-thread-1,5,main] java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190} at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Error in opening FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190} at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89) at org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294) at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270) at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191) at org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ... 2 more Caused by: java.io.FileNotFoundException: **/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data (No such file or directory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at
[jira] [Updated] (SPARK-36255) FileNotFoundException from the shuffle push can cause the executor to terminate
[ https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-36255: -- Summary: FileNotFoundException from the shuffle push can cause the executor to terminate (was: FileNotFound exceptions from the shuffle push can cause the executor to terminate) > FileNotFoundException from the shuffle push can cause the executor to > terminate > --- > > Key: SPARK-36255 > URL: https://issues.apache.org/jira/browse/SPARK-36255 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > When the shuffle files are cleaned up by the executors once a job in a Spark > application completes, the push of the shuffle data by the executor can throw > FileNotFound exception. When this exception is thrown from the > {{shuffle-block-push-thread}}, it causes the executor to fail. This is > because of the default uncaught exception handler for Spark daemon threads > which terminates the executor when there are uncaught exceptions for the > daemon threads. > {code:java} > 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught > exception in thread Thread[block-push-thread-1,5,main] > java.lang.Error: java.io.IOException: Error in opening > FileSegmentManagedBuffer > {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, > offset=10640, length=190} > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Error in opening > FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, > offset=10640, length=190} > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89) > at > org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294) > at > org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270) > at > org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191) > at > org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ... 2 more > Caused by: java.io.FileNotFoundException: > **/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data > (No such file or directory) > at java.io.RandomAccessFile.open0(Native Method) > at java.io.RandomAccessFile.open(RandomAccessFile.java:316) > at java.io.RandomAccessFile.(RandomAccessFile.java:243) > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62) > {code} > We can address the issue by handling "FileNotFound" exceptions in the push > threads and netty threads by stopping the push when {{FileNotFound}} is > encountered. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36255) FileNotFound exceptions from the shuffle push can cause the executor to terminate
[ https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-36255: -- Summary: FileNotFound exceptions from the shuffle push can cause the executor to terminate (was: FileNotFound exceptions in the Shuffle-push-thread can cause the executor to fail) > FileNotFound exceptions from the shuffle push can cause the executor to > terminate > - > > Key: SPARK-36255 > URL: https://issues.apache.org/jira/browse/SPARK-36255 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > When the shuffle files are cleaned up by the executors once a job in a Spark > application completes, the push of the shuffle data by the executor can throw > FileNotFound exception. When this exception is thrown from the > {{shuffle-block-push-thread}}, it causes the executor to fail. This is > because of the default uncaught exception handler for Spark daemon threads > which terminates the executor when there are uncaught exceptions for the > daemon threads. > {code:java} > 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught > exception in thread Thread[block-push-thread-1,5,main] > java.lang.Error: java.io.IOException: Error in opening > FileSegmentManagedBuffer > {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, > offset=10640, length=190} > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Error in opening > FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, > offset=10640, length=190} > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89) > at > org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294) > at > org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270) > at > org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191) > at > org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ... 2 more > Caused by: java.io.FileNotFoundException: > **/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data > (No such file or directory) > at java.io.RandomAccessFile.open0(Native Method) > at java.io.RandomAccessFile.open(RandomAccessFile.java:316) > at java.io.RandomAccessFile.(RandomAccessFile.java:243) > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62) > {code} > We can address the issue by handling "FileNotFound" exceptions in the push > threads and netty threads by stopping the push when {{FileNotFound}} is > encountered. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36255) FileNotFound exceptions in the Shuffle-push-thread can cause the executor to fail
[ https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-36255: -- Description: When the shuffle files are cleaned up by the executors once a job in a Spark application completes, the push of the shuffle data by the executor can throw FileNotFound exception. When this exception is thrown from the {{shuffle-block-push-thread}}, it causes the executor to fail. This is because of the default uncaught exception handler for Spark daemon threads which terminates the executor when there are uncaught exceptions for the daemon threads. {code:java} 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[block-push-thread-1,5,main] java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190} at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Error in opening FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190} at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89) at org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294) at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270) at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191) at org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ... 2 more Caused by: java.io.FileNotFoundException: **/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data (No such file or directory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.(RandomAccessFile.java:243) at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62) {code} We can address the issue by handling "FileNotFound" exceptions in the push threads and netty threads by stopping the push when {{FileNotFound}} is encountered. was: When the shuffle files are cleaned up by the executors once a job in a Spark application completes, the push of the shuffle data by the executor can throw FileNotFound exception. When this exception is thrown from the {{shuffle-block-push-thread}}, it causes the executor to fail. This is because of the default uncaught exception handler for Spark daemon threads which terminates the executor when there are exceptions for the daemon threads. {code:java} 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[block-push-thread-1,5,main] java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190} at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Error in opening FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190} at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89) at org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294) at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270) at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191) at org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ... 2 more Caused by: java.io.FileNotFoundException: **/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data (No such file or directory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.(RandomAccessFile.java:243) at
[jira] [Updated] (SPARK-36255) FileNotFound exceptions in the Shuffle-push-thread can cause the executor to fail
[ https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-36255: -- Description: When the shuffle files are cleaned up by the executors once a job in a Spark application completes, the push of the shuffle data by the executor can throw FileNotFound exception. When this exception is thrown from the {{shuffle-block-push-thread}}, it causes the executor to fail. This is because of the default uncaught exception handler for Spark daemon threads which terminates the executor when there are exceptions for the daemon threads. {code:java} 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[block-push-thread-1,5,main] java.lang.Error: java.io.IOException: Error in opening FileSegmentManagedBuffer {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190} at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Error in opening FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, offset=10640, length=190} at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89) at org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294) at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$sendRequest(ShuffleWriter.scala:270) at org.apache.spark.shuffle.ShuffleWriter.org$apache$spark$shuffle$ShuffleWriter$$pushUpToMax(ShuffleWriter.scala:191) at org.apache.spark.shuffle.ShuffleWriter$$anon$2$$anon$4.run(ShuffleWriter.scala:244) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ... 2 more Caused by: java.io.FileNotFoundException: **/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data (No such file or directory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.(RandomAccessFile.java:243) at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:62) {code} We can address the issue by handling "FileNotFound" exceptions in the push threads and netty threads by stopping the push when {{FileNotFound}} is encountered. was:When the shuffle files are cleaned up the executors once a job completes, the push of the shuffle data will throw FileNotFound exceptions. This exception when thrown from the {{shuffle-block-push-thread}} still causes the executor to fail. > FileNotFound exceptions in the Shuffle-push-thread can cause the executor to > fail > - > > Key: SPARK-36255 > URL: https://issues.apache.org/jira/browse/SPARK-36255 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > When the shuffle files are cleaned up by the executors once a job in a Spark > application completes, the push of the shuffle data by the executor can throw > FileNotFound exception. When this exception is thrown from the > {{shuffle-block-push-thread}}, it causes the executor to fail. This is > because of the default uncaught exception handler for Spark daemon threads > which terminates the executor when there are exceptions for the daemon > threads. > {code:java} > 21/06/17 16:03:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught > exception in thread Thread[block-push-thread-1,5,main] > java.lang.Error: java.io.IOException: Error in opening > FileSegmentManagedBuffer > {file=/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, > offset=10640, length=190} > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Error in opening > FileSegmentManagedBuffer\{file=***/application_1619720975011_11057757/blockmgr-560cb4cf-9918-4ea7-a007-a16c5e3a35fe/0a/shuffle_1_690_0.data, > offset=10640, length=190} > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:89) > at > org.apache.spark.shuffle.ShuffleWriter.sliceReqBufferIntoBlockBuffers(ShuffleWriter.scala:294) > at >
[jira] [Updated] (SPARK-36255) FileNotFound exceptions in the Shuffle-push-thread can cause the executor to fail
[ https://issues.apache.org/jira/browse/SPARK-36255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-36255: -- Parent: SPARK-30602 Issue Type: Sub-task (was: Bug) > FileNotFound exceptions in the Shuffle-push-thread can cause the executor to > fail > - > > Key: SPARK-36255 > URL: https://issues.apache.org/jira/browse/SPARK-36255 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > When the shuffle files are cleaned up the executors once a job completes, the > push of the shuffle data will throw FileNotFound exceptions. This exception > when thrown from the {{shuffle-block-push-thread}} still causes the executor > to fail. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36255) FileNotFound exceptions in the Shuffle-push-thread can cause the executor to fail
Chandni Singh created SPARK-36255: - Summary: FileNotFound exceptions in the Shuffle-push-thread can cause the executor to fail Key: SPARK-36255 URL: https://issues.apache.org/jira/browse/SPARK-36255 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 3.1.0 Reporter: Chandni Singh When the shuffle files are cleaned up the executors once a job completes, the push of the shuffle data will throw FileNotFound exceptions. This exception when thrown from the {{shuffle-block-push-thread}} still causes the executor to fail. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35917) Disable push-based shuffle until the feature is complete
[ https://issues.apache.org/jira/browse/SPARK-35917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371445#comment-17371445 ] Chandni Singh commented on SPARK-35917: --- [~code_kr_dev_s] I already opened a PR for this yesterday. It is linked to the jira > Disable push-based shuffle until the feature is complete > > > Key: SPARK-35917 > URL: https://issues.apache.org/jira/browse/SPARK-35917 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > Push-based shuffle is partially merged in apache master but some of the tasks > are still incomplete. Since 3.2 is going to cut soon, we will not be able to > get the pending tasks reviewed and merged. Few of the pending tasks make > protocol changes to the push-based shuffle protocols, so we would like to > prevent users from enabling push-based shuffle both on the client and the > server until push-based shuffle implementation is complete. > We can prevent push-based shuffle to be used by throwing > {{UnsupportedOperationException}} (or something like that) both on the client > and the server when the user tries to enable it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35917) Disable push-based shuffle until the feature is complete
[ https://issues.apache.org/jira/browse/SPARK-35917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-35917: -- Description: Push-based shuffle is partially merged in apache master but some of the tasks are still incomplete. Since 3.2 is going to cut soon, we will not be able to get the pending tasks reviewed and merged. Few of the pending tasks make protocol changes to the push-based shuffle protocols, so we would like to prevent users from enabling push-based shuffle both on the client and the server until push-based shuffle implementation is complete. We can prevent push-based shuffle to be used by throwing {{UnsupportedOperationException}} (or something like that) both on the client and the server when the user tries to enable it. > Disable push-based shuffle until the feature is complete > > > Key: SPARK-35917 > URL: https://issues.apache.org/jira/browse/SPARK-35917 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > Push-based shuffle is partially merged in apache master but some of the tasks > are still incomplete. Since 3.2 is going to cut soon, we will not be able to > get the pending tasks reviewed and merged. Few of the pending tasks make > protocol changes to the push-based shuffle protocols, so we would like to > prevent users from enabling push-based shuffle both on the client and the > server until push-based shuffle implementation is complete. > We can prevent push-based shuffle to be used by throwing > {{UnsupportedOperationException}} (or something like that) both on the client > and the server when the user tries to enable it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35917) Disable push-based shuffle until the feature is complete
Chandni Singh created SPARK-35917: - Summary: Disable push-based shuffle until the feature is complete Key: SPARK-35917 URL: https://issues.apache.org/jira/browse/SPARK-35917 Project: Spark Issue Type: Sub-task Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Chandni Singh -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35836) Remove reference to spark.shuffle.push.based.enabled in ShuffleBlockPusherSuite
Chandni Singh created SPARK-35836: - Summary: Remove reference to spark.shuffle.push.based.enabled in ShuffleBlockPusherSuite Key: SPARK-35836 URL: https://issues.apache.org/jira/browse/SPARK-35836 Project: Spark Issue Type: Bug Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Chandni Singh The test suite for ShuffleBlockPusherSuite was added with SPARK-32917 and in this suite, the configuration for push-based shuffle is incorrectly referenced as {{spark.shuffle.push.based.enabled}}. We need to remove this config from here. {{ShuffleBlockPusher}} is created only when push based shuffle is enabled and this suite is for {{ShuffleBlockPusher}}, so no other change is required. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35671) Add Support in the ESS to serve merged shuffle block meta and data to executors
[ https://issues.apache.org/jira/browse/SPARK-35671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-35671: -- Description: With push-based shuffle enabled, the reducers send 2 different requests to the ESS: 1. Request to fetch the metadata of the merged shuffle block. 2. Requests to fetch the data of the merged shuffle block in chunks which are by default 2MB in size. The ESS should be able to serve these requests and this Jira targets all the changes in the network-common and network-shuffle modules to be able to support this. was: With push-based shuffle enabled, the reducers send 2 different requests to the ESS: 1. Request to fetch the metadata of the merged shuffle block. 2. Requests to fetch the data of the merged shuffle block in chunks which are by default 2MB in size. The ESS should be able to serve these requests and this Jira targets all the changes in the ESS to be able to support this. > Add Support in the ESS to serve merged shuffle block meta and data to > executors > --- > > Key: SPARK-35671 > URL: https://issues.apache.org/jira/browse/SPARK-35671 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > With push-based shuffle enabled, the reducers send 2 different requests to > the ESS: > 1. Request to fetch the metadata of the merged shuffle block. > 2. Requests to fetch the data of the merged shuffle block in chunks which > are by default 2MB in size. > The ESS should be able to serve these requests and this Jira targets all the > changes in the network-common and network-shuffle modules to be able to > support this. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35671) Add Support in the ESS to serve merged shuffle block meta and data to executors
Chandni Singh created SPARK-35671: - Summary: Add Support in the ESS to serve merged shuffle block meta and data to executors Key: SPARK-35671 URL: https://issues.apache.org/jira/browse/SPARK-35671 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.1.0 Reporter: Chandni Singh With push-based shuffle enabled, the reducers send 2 different requests to the ESS: 1. Request to fetch the metadata of the merged shuffle block. 2. Requests to fetch the data of the merged shuffle block in chunks which are by default 2MB in size. The ESS should be able to serve these requests and this Jira targets all the changes in the ESS to be able to support this. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32922) Add support for ShuffleBlockFetcherIterator to read from merged shuffle partitions and to fallback to original shuffle blocks if encountering failures
[ https://issues.apache.org/jira/browse/SPARK-32922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358768#comment-17358768 ] Chandni Singh commented on SPARK-32922: --- Splitting the changes into ESS server side/client side changes as per the comment here: https://github.com/apache/spark/pull/32140#issuecomment-856099709 > Add support for ShuffleBlockFetcherIterator to read from merged shuffle > partitions and to fallback to original shuffle blocks if encountering failures > -- > > Key: SPARK-32922 > URL: https://issues.apache.org/jira/browse/SPARK-32922 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > > With the extended MapOutputTracker, the reducers can now get the task input > data from the merged shuffle partitions for more efficient shuffle data > fetch. The reducers should also be able to fallback to fetching the original > unmarked blocks if it encounters failures when fetching the merged shuffle > partitions. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35546) Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way
[ https://issues.apache.org/jira/browse/SPARK-35546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-35546: -- Summary: Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way (was: Properly handle race conditions in RemoteBlockPushResolver to support push based shuffle with multiple app attempts enabled) > Enable push-based shuffle when multiple app attempts are enabled and manage > concurrent access to the state in a better way > --- > > Key: SPARK-35546 > URL: https://issues.apache.org/jira/browse/SPARK-35546 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Ye Zhou >Priority: Major > > In the current implementation of RemoteBlockPushResolver, two > ConcurrentHashmap are used to store #1 applicationId -> > mergedShuffleLocalDirPath #2 applicationId+attemptId+shuffleID -> > mergedShuffleParitionInfo. As there are four types of messages: > ExecutorRegister, PushBlocks, FinalizeShuffleMerge and ApplicationRemove, > will trigger different types of operations within these two hashmaps, it is > required to maintain strong consistency about the informations stored in > these two hashmaps. Otherwise, either there will be data > corruption/correctness issues or memory leak in shuffle server. > We should come up with systematic way to resolve this, other than spot fixing > the potential issues. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-31689) ShuffleBlockFetchIterator keeps localBlocks in its memory even though it never uses it
[ https://issues.apache.org/jira/browse/SPARK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh resolved SPARK-31689. --- Resolution: Incomplete This will be addressed with SPARK-32922 where this change is required. > ShuffleBlockFetchIterator keeps localBlocks in its memory even though it > never uses it > -- > > Key: SPARK-31689 > URL: https://issues.apache.org/jira/browse/SPARK-31689 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.4.5 >Reporter: Chandni Singh >Priority: Minor > > The {{localBlocks}} is created and used in the {{initialize}} method of > ShuffleBlockFetchIterator but is never used after that. > It can be local to the initialize method instead of being a field in the > {{ShuffleBlockFetchIterator}} instance. It holds on to memory until iterator > instance is alive which is unnecessary. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed
[ https://issues.apache.org/jira/browse/SPARK-34840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-34840: -- Parent: SPARK-30602 Issue Type: Sub-task (was: Bug) > Fix cases of corruption in merged shuffle blocks that are pushed > > > Key: SPARK-34840 > URL: https://issues.apache.org/jira/browse/SPARK-34840 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Assignee: Chandni Singh >Priority: Major > Fix For: 3.1.2, 3.2.0 > > > The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and > merges them was introduced in > [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 > scenarios where the merged blocks get corrupted: > # {{StreamCallback.onFailure()}} is called more than once. Initially we > assumed that the onFailure callback will be called just once per stream. > However, we observed that this is called twice when a client connection is > reset. When the client connection is reset then there are 2 events that get > triggered in this order. > * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. > {{StreamInterceptor.exceptionCaught()}} invokes > {{callback.onFailure(streamId, cause)}}. This is the first time > StreamCallback.onFailure() will be invoked. > * {{channelInactive}}. Since the channel closes, the {{channelInactive}} > event gets triggered which again is propagated to {{StreamInterceptor}}. > {{StreamInterceptor.channelInactive()}} invokes > {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the > second time StreamCallback.onFailure() will be invoked. > # The flag {{isWriting}} is set prematurely to true. This introduces an edge > case where a stream that is trying to merge a duplicate block (created > because of a speculative task) may interfere with an active stream if the > duplicate stream fails. > Also adding additional changes that improve the code. > # Using positional writes all the time because this simplifies the code and > with microbenchmarking haven't seen any performance impact. > # Additional minor changes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34840) Fix cases of corruption in merged shuffle blocks that are pushed
Chandni Singh created SPARK-34840: - Summary: Fix cases of corruption in merged shuffle blocks that are pushed Key: SPARK-34840 URL: https://issues.apache.org/jira/browse/SPARK-34840 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 3.1.0 Reporter: Chandni Singh The {{RemoteBlockPushResolver}} which handles the shuffle push blocks and merges them was introduced in [#30062|https://github.com/apache/spark/pull/30062]. We have identified 2 scenarios where the merged blocks get corrupted: # {{StreamCallback.onFailure()}} is called more than once. Initially we assumed that the onFailure callback will be called just once per stream. However, we observed that this is called twice when a client connection is reset. When the client connection is reset then there are 2 events that get triggered in this order. * {{exceptionCaught}}. This event is propagated to {{StreamInterceptor}}. {{StreamInterceptor.exceptionCaught()}} invokes {{callback.onFailure(streamId, cause)}}. This is the first time StreamCallback.onFailure() will be invoked. * {{channelInactive}}. Since the channel closes, the {{channelInactive}} event gets triggered which again is propagated to {{StreamInterceptor}}. {{StreamInterceptor.channelInactive()}} invokes {{callback.onFailure(streamId, new ClosedChannelException())}}. This is the second time StreamCallback.onFailure() will be invoked. # The flag {{isWriting}} is set prematurely to true. This introduces an edge case where a stream that is trying to merge a duplicate block (created because of a speculative task) may interfere with an active stream if the duplicate stream fails. Also adding additional changes that improve the code. # Using positional writes all the time because this simplifies the code and with microbenchmarking haven't seen any performance impact. # Additional minor changes. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33665) Enable ShuffleBlockPusher to stop pushing blocks for a particular shuffle partition
Chandni Singh created SPARK-33665: - Summary: Enable ShuffleBlockPusher to stop pushing blocks for a particular shuffle partition Key: SPARK-33665 URL: https://issues.apache.org/jira/browse/SPARK-33665 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.1.0 Reporter: Chandni Singh {{ShuffleBlockPusher}}, which was introduced in SPARK-32917 stops pushing shuffle blocks for the entire shuffle when it receives "TOO Late" exception. However, with the change [https://github.com/apache/spark/pull/30433], there is also a need to stop pushing shuffle blocks for a particular reduce partition. Refer https://github.com/apache/spark/pull/30433#discussion_r533694433 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31689) ShuffleBlockFetchIterator keeps localBlocks in its memory even though it never uses it
[ https://issues.apache.org/jira/browse/SPARK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-31689: -- Description: The {{localBlocks}} is created and used in the {{initialize}} method of ShuffleBlockFetchIterator but is never used after that. It can be local to the initialize method instead of being a field in the {{ShuffleBlockFetchIterator}} instance. It holds on to memory until iterator instance is alive which is unnecessary. was: The {{localBlocks}} is created and used in the {{initialize}} method of ShuffleBlockFetchIterator but is never used after that. It can be local to the initialize method instead of being a field in the {{ShuffleBlockFetchIterator}} instance. It hold son to memory until iterator instance is alive which is unnecessary. > ShuffleBlockFetchIterator keeps localBlocks in its memory even though it > never uses it > -- > > Key: SPARK-31689 > URL: https://issues.apache.org/jira/browse/SPARK-31689 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 2.4.5 >Reporter: Chandni Singh >Priority: Minor > > The {{localBlocks}} is created and used in the {{initialize}} method of > ShuffleBlockFetchIterator but is never used after that. > It can be local to the initialize method instead of being a field in the > {{ShuffleBlockFetchIterator}} instance. It holds on to memory until iterator > instance is alive which is unnecessary. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33350) Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
[ https://issues.apache.org/jira/browse/SPARK-33350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-33350: -- Summary: Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data (was: Add support to DiskBlockManager to create merge directory and the ability to get the shuffle merged data) > Add support to DiskBlockManager to create merge directory and to get the > local shuffle merged data > -- > > Key: SPARK-33350 > URL: https://issues.apache.org/jira/browse/SPARK-33350 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > DiskBlockManager should be able to create the {{merge_manager}} directory, > where the push-based merged shuffle files are written and also create > sub-dirs under it. > It should also be able to serve the local merged shuffle data/index/meta > files. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33350) Add support to DiskBlockManager to create merge directory and the ability to get the shuffle merged data
Chandni Singh created SPARK-33350: - Summary: Add support to DiskBlockManager to create merge directory and the ability to get the shuffle merged data Key: SPARK-33350 URL: https://issues.apache.org/jira/browse/SPARK-33350 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.1.0 Reporter: Chandni Singh DiskBlockManager should be able to create the {{merge_manager}} directory, where the push-based merged shuffle files are written and also create sub-dirs under it. It should also be able to serve the local merged shuffle data/index/meta files. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33331) Limit the number of pending blocks in memory and store blocks that collide
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-1: -- Description: This jira addresses the below two points: 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately are stored in memory. The stream callback maintains a list of {{deferredBufs}}. When a block cannot be merged it is added to this list. Currently, there isn't a limit on the number of pending blocks. We can limit the number of pending blocks in memory. There has been a discussion around this here: [https://github.com/apache/spark/pull/30062#discussion_r514026014] 2. When a stream doesn't get an opportunity to merge, then {{RemoteBlockPushResolver}} ignores the data from that stream. Another approach is to store the data of the stream in {{AppShufflePartitionInfo}} when it reaches the worst-case scenario. This may increase the memory usage of the shuffle service though. However, given a limit introduced with 1 we can try this out. More information can be found in this discussion: [https://github.com/apache/spark/pull/30062#discussion_r517524546] was: This jira addresses the below two points: 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately are stored in memory. The stream callback maintains a list of {{deferredBufs}}. When a block cannot be merged it is added to this list. Currently, there isn't a limit on the number of pending blocks. We can limit the number of pending blocks in memory. There has been a discussion around this here: [https://github.com/apache/spark/pull/30062#discussion_r514026014 ] 2. When a stream doesn't get an opportunity to merge, then {{RemoteBlockPushResolver}} ignores the data from that stream. Another approach is to store the data of the stream in {{AppShufflePartitionInfo}} when it reaches the worst-case scenario. This may increase the memory usage of the shuffle service though. However, given a limit introduced with 1 we can try this out. More information can be found in this discussion: [https://github.com/apache/spark/pull/30062#discussion_r517524546] > Limit the number of pending blocks in memory and store blocks that collide > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > This jira addresses the below two points: > 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately > are stored in memory. The stream callback maintains a list of > {{deferredBufs}}. When a block cannot be merged it is added to this list. > Currently, there isn't a limit on the number of pending blocks. We can limit > the number of pending blocks in memory. There has been a discussion around > this here: > [https://github.com/apache/spark/pull/30062#discussion_r514026014] > 2. When a stream doesn't get an opportunity to merge, then > {{RemoteBlockPushResolver}} ignores the data from that stream. Another > approach is to store the data of the stream in {{AppShufflePartitionInfo}} > when it reaches the worst-case scenario. This may increase the memory usage > of the shuffle service though. However, given a limit introduced with 1 we > can try this out. > More information can be found in this discussion: > [https://github.com/apache/spark/pull/30062#discussion_r517524546] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33331) Limit the number of pending blocks in memory and store blocks that collide
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-1: -- Description: This jira addresses the below two points: 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately are stored in memory. The stream callback maintains a list of {{deferredBufs}}. When a block cannot be merged it is added to this list. Currently, there isn't a limit on the number of pending blocks. We can limit the number of pending blocks in memory. There has been a discussion around this here: [https://github.com/apache/spark/pull/30062#discussion_r514026014 ] 2. When a stream doesn't get an opportunity to merge, then {{RemoteBlockPushResolver}} ignores the data from that stream. Another approach is to store the data of the stream in {{AppShufflePartitionInfo}} when it reaches the worst-case scenario. This may increase the memory usage of the shuffle service though. However, given a limit introduced with 1 we can try this out. More information can be found in this discussion: [https://github.com/apache/spark/pull/30062#discussion_r517524546] was: 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately are stored in memory. The stream callback maintains a list of {{deferredBufs}}. When a block cannot be merged it is added to this list. Currently, there isn't a limit on the number of pending blocks. There has been a discussion around this here: https://github.com/apache/spark/pull/30062#discussion_r514026014 2. When a stream doesn't get an opportunity to merge, then {{RemoteBlockPushResolver}} ignores the data from that stream. Another approach is to store the data of the stream in {{AppShufflePartitionInfo}} when it reaches the worst-case scenario. This may increase the memory usage of the shuffle service though. However, given a limit introduced with 1 we can try this out. > Limit the number of pending blocks in memory and store blocks that collide > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > This jira addresses the below two points: > 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately > are stored in memory. The stream callback maintains a list of > {{deferredBufs}}. When a block cannot be merged it is added to this list. > Currently, there isn't a limit on the number of pending blocks. We can limit > the number of pending blocks in memory. There has been a discussion around > this here: > [https://github.com/apache/spark/pull/30062#discussion_r514026014 > ] > 2. When a stream doesn't get an opportunity to merge, then > {{RemoteBlockPushResolver}} ignores the data from that stream. Another > approach is to store the data of the stream in {{AppShufflePartitionInfo}} > when it reaches the worst-case scenario. This may increase the memory usage > of the shuffle service though. However, given a limit introduced with 1 we > can try this out. > More information can be found in this discussion: > [https://github.com/apache/spark/pull/30062#discussion_r517524546] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33331) Limit the number of pending blocks in memory and store blocks that collide
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-1: -- Summary: Limit the number of pending blocks in memory and store blocks that collide (was: Limit the number of pending blocks in memory when RemoteBlockPushResolver defers a block) > Limit the number of pending blocks in memory and store blocks that collide > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately > are stored in memory. The stream callback maintains a list of > {{deferredBufs}}. When a block cannot be merged it is added to this list. > Currently, there isn't a limit on the number of pending blocks. There has > been a discussion around this here: > https://github.com/apache/spark/pull/30062#discussion_r514026014 > 2. When a stream doesn't get an opportunity to merge, then > {{RemoteBlockPushResolver}} ignores the data from that stream. Another > approach is to store the data of the stream in {{AppShufflePartitionInfo}} > when it reaches the worst-case scenario. This may increase the memory usage > of the shuffle service though. However, given a limit introduced with 1 we > can try this out. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33331) Limit the number of pending blocks in memory when RemoteBlockPushResolver defers a block
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-1: -- Description: 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately are stored in memory. The stream callback maintains a list of {{deferredBufs}}. When a block cannot be merged it is added to this list. Currently, there isn't a limit on the number of pending blocks. There has been a discussion around this here: https://github.com/apache/spark/pull/30062#discussion_r514026014 2. When a stream doesn't get an opportunity to merge, then {{RemoteBlockPushResolver}} ignores the data from that stream. Another approach is to store the data of the stream in {{AppShufflePartitionInfo}} when it reaches the worst-case scenario. This may increase the memory usage of the shuffle service though. However, given a limit introduced with 1 we can try this out. was: This is to address the comment here: https://github.com/apache/spark/pull/30062#discussion_r514026014 > Limit the number of pending blocks in memory when RemoteBlockPushResolver > defers a block > > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > 1. In {{RemoteBlockPushResolver}}, bytes that cannot be merged immediately > are stored in memory. The stream callback maintains a list of > {{deferredBufs}}. When a block cannot be merged it is added to this list. > Currently, there isn't a limit on the number of pending blocks. There has > been a discussion around this here: > https://github.com/apache/spark/pull/30062#discussion_r514026014 > 2. When a stream doesn't get an opportunity to merge, then > {{RemoteBlockPushResolver}} ignores the data from that stream. Another > approach is to store the data of the stream in {{AppShufflePartitionInfo}} > when it reaches the worst-case scenario. This may increase the memory usage > of the shuffle service though. However, given a limit introduced with 1 we > can try this out. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33331) Limit the number of pending blocks in memory when RemoteBlockPushResolver defers a block
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-1: -- Summary: Limit the number of pending blocks in memory when RemoteBlockPushResolver defers a block (was: Limit the number of pending blocks in memory when {{RemoteBlockPushResolver}} defers a block) > Limit the number of pending blocks in memory when RemoteBlockPushResolver > defers a block > > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Sub-task > Components: Shuffle >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > > This is to address the comment here: > https://github.com/apache/spark/pull/30062#discussion_r514026014 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33331) Limit the number of pending blocks in memory when {{RemoteBlockPushResolver}} defers a block
Chandni Singh created SPARK-1: - Summary: Limit the number of pending blocks in memory when {{RemoteBlockPushResolver}} defers a block Key: SPARK-1 URL: https://issues.apache.org/jira/browse/SPARK-1 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.1.0 Reporter: Chandni Singh This is to address the comment here: https://github.com/apache/spark/pull/30062#discussion_r514026014 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33236) Enable Push-based shuffle service to store state in NM level DB for work preserving restart
Chandni Singh created SPARK-33236: - Summary: Enable Push-based shuffle service to store state in NM level DB for work preserving restart Key: SPARK-33236 URL: https://issues.apache.org/jira/browse/SPARK-33236 Project: Spark Issue Type: Sub-task Components: Shuffle Affects Versions: 3.1.0 Reporter: Chandni Singh -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33235) Push-based Shuffle Phase 2 Tasks
[ https://issues.apache.org/jira/browse/SPARK-33235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-33235: -- Description: This is the parent jira for the phase 2 or follow-up tasks for supporting Push-based shuffle. Refer SPARK-30602. was: In a large deployment of a Spark compute infrastructure, Spark shuffle is becoming a potential scaling bottleneck and a source of inefficiency in the cluster. When doing Spark on YARN for a large-scale deployment, people usually enable Spark external shuffle service and store the intermediate shuffle files on HDD. Because the number of blocks generated for a particular shuffle grows quadratically compared to the size of shuffled data (# mappers and reducers grows linearly with the size of shuffled data, but # blocks is # mappers * # reducers), one general trend we have observed is that the more data a Spark application processes, the smaller the block size becomes. In a few production clusters we have seen, the average shuffle block size is only 10s of KBs. Because of the inefficiency of performing random reads on HDD for small amount of data, the overall efficiency of the Spark external shuffle services serving the shuffle blocks degrades as we see an increasing # of Spark applications processing an increasing amount of data. In addition, because Spark external shuffle service is a shared service in a multi-tenancy cluster, the inefficiency with one Spark application could propagate to other applications as well. In this ticket, we propose a solution to improve Spark shuffle efficiency in above mentioned environments with push-based shuffle. With push-based shuffle, shuffle is performed at the end of mappers and blocks get pre-merged and move towards reducers. In our prototype implementation, we have seen significant efficiency improvements when performing large shuffles. We take a Spark-native approach to achieve this, i.e., extending Spark’s existing shuffle netty protocol, and the behaviors of Spark mappers, reducers and drivers. This way, we can bring the benefits of more efficient shuffle in Spark without incurring the dependency or overhead of either specialized storage layer or external infrastructure pieces. Link to dev mailing list discussion: http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html > Push-based Shuffle Phase 2 Tasks > > > Key: SPARK-33235 > URL: https://issues.apache.org/jira/browse/SPARK-33235 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Chandni Singh >Priority: Major > Labels: release-notes > > This is the parent jira for the phase 2 or follow-up tasks for supporting > Push-based shuffle. Refer SPARK-30602. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33235) Push-based Shuffle Phase 2 Tasks
Chandni Singh created SPARK-33235: - Summary: Push-based Shuffle Phase 2 Tasks Key: SPARK-33235 URL: https://issues.apache.org/jira/browse/SPARK-33235 Project: Spark Issue Type: Improvement Components: Shuffle, Spark Core Affects Versions: 3.1.0 Reporter: Chandni Singh In a large deployment of a Spark compute infrastructure, Spark shuffle is becoming a potential scaling bottleneck and a source of inefficiency in the cluster. When doing Spark on YARN for a large-scale deployment, people usually enable Spark external shuffle service and store the intermediate shuffle files on HDD. Because the number of blocks generated for a particular shuffle grows quadratically compared to the size of shuffled data (# mappers and reducers grows linearly with the size of shuffled data, but # blocks is # mappers * # reducers), one general trend we have observed is that the more data a Spark application processes, the smaller the block size becomes. In a few production clusters we have seen, the average shuffle block size is only 10s of KBs. Because of the inefficiency of performing random reads on HDD for small amount of data, the overall efficiency of the Spark external shuffle services serving the shuffle blocks degrades as we see an increasing # of Spark applications processing an increasing amount of data. In addition, because Spark external shuffle service is a shared service in a multi-tenancy cluster, the inefficiency with one Spark application could propagate to other applications as well. In this ticket, we propose a solution to improve Spark shuffle efficiency in above mentioned environments with push-based shuffle. With push-based shuffle, shuffle is performed at the end of mappers and blocks get pre-merged and move towards reducers. In our prototype implementation, we have seen significant efficiency improvements when performing large shuffles. We take a Spark-native approach to achieve this, i.e., extending Spark’s existing shuffle netty protocol, and the behaviors of Spark mappers, reducers and drivers. This way, we can bring the benefits of more efficient shuffle in Spark without incurring the dependency or overhead of either specialized storage layer or external infrastructure pieces. Link to dev mailing list discussion: http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30602) SPIP: Support push-based shuffle to improve shuffle efficiency
[ https://issues.apache.org/jira/browse/SPARK-30602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198679#comment-17198679 ] Chandni Singh commented on SPARK-30602: --- The reference PR for the consolidated change: https://github.com/apache/spark/pull/29808 > SPIP: Support push-based shuffle to improve shuffle efficiency > -- > > Key: SPARK-30602 > URL: https://issues.apache.org/jira/browse/SPARK-30602 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 3.1.0 >Reporter: Min Shen >Priority: Major > Attachments: Screen Shot 2020-06-23 at 11.31.22 AM.jpg, > vldb_magnet_final.pdf > > > In a large deployment of a Spark compute infrastructure, Spark shuffle is > becoming a potential scaling bottleneck and a source of inefficiency in the > cluster. When doing Spark on YARN for a large-scale deployment, people > usually enable Spark external shuffle service and store the intermediate > shuffle files on HDD. Because the number of blocks generated for a particular > shuffle grows quadratically compared to the size of shuffled data (# mappers > and reducers grows linearly with the size of shuffled data, but # blocks is # > mappers * # reducers), one general trend we have observed is that the more > data a Spark application processes, the smaller the block size becomes. In a > few production clusters we have seen, the average shuffle block size is only > 10s of KBs. Because of the inefficiency of performing random reads on HDD for > small amount of data, the overall efficiency of the Spark external shuffle > services serving the shuffle blocks degrades as we see an increasing # of > Spark applications processing an increasing amount of data. In addition, > because Spark external shuffle service is a shared service in a multi-tenancy > cluster, the inefficiency with one Spark application could propagate to other > applications as well. > In this ticket, we propose a solution to improve Spark shuffle efficiency in > above mentioned environments with push-based shuffle. With push-based > shuffle, shuffle is performed at the end of mappers and blocks get pre-merged > and move towards reducers. In our prototype implementation, we have seen > significant efficiency improvements when performing large shuffles. We take a > Spark-native approach to achieve this, i.e., extending Spark’s existing > shuffle netty protocol, and the behaviors of Spark mappers, reducers and > drivers. This way, we can bring the benefits of more efficient shuffle in > Spark without incurring the dependency or overhead of either specialized > storage layer or external infrastructure pieces. > > Link to dev mailing list discussion: > http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-push-based-shuffle-in-Spark-td28732.html -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32664) Getting local shuffle block clutters the executor logs
Chandni Singh created SPARK-32664: - Summary: Getting local shuffle block clutters the executor logs Key: SPARK-32664 URL: https://issues.apache.org/jira/browse/SPARK-32664 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 3.1.0 Reporter: Chandni Singh The below log statement in {{BlockManager.getLocalBlockData}} should be at debug level {code:java} logInfo(s"Getting local shuffle block ${blockId}") {code} Currently, the executor logs get cluttered with this {code:java} 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6103_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6132_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6137_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6312_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6323_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6402_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6413_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6694_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6709_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6753_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6822_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6894_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_6913_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_7052_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_7073_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_7167_4964 20/08/20 02:07:52 INFO storage.BlockManager: Getting local shuffle block shuffle_0_7194_4964 {code} This was added with SPARK-20629. cc. [~holden] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32663) TransportClient getting closed when there are outstanding requests to the server
[ https://issues.apache.org/jira/browse/SPARK-32663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-32663: -- Description: The implementation of {{removeBlocks}} and {{getHostLocalDirs}} in {{ExternalBlockStoreClient}} closes the client after processing a response in the callback. This is a cached client which will be re-used for other responses. There could be other outstanding request to the shuffle service, so it should not be closed after processing a response. Seems like this is a bug introduced with SPARK-27651 and SPARK-27677. The older methods {{registerWithShuffleServer}} and {{fetchBlocks}} didn't close the client. cc [~attilapiros] [~vanzin] [~mridulm80] was: The implementation of {{removeBlocks}} and {{getHostLocalDirs}} in {{ExternalBlockStoreClient}} closes the client after processing a response in the callback. This is a cached client which will be re-used for other responses. There could be other outstanding request to the shuffle service, so it should not be closed after processing a response. Seems like this is a bug introduced with SPARK-27651 and SPARK-27677. The older methods {{registerWithShuffleServer}} and {{fetchBlocks}} didn't close the client. cc [~attilapiros] [~vanzin] [~mridul] > TransportClient getting closed when there are outstanding requests to the > server > > > Key: SPARK-32663 > URL: https://issues.apache.org/jira/browse/SPARK-32663 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Chandni Singh >Priority: Major > > The implementation of {{removeBlocks}} and {{getHostLocalDirs}} in > {{ExternalBlockStoreClient}} closes the client after processing a response in > the callback. > This is a cached client which will be re-used for other responses. There > could be other outstanding request to the shuffle service, so it should not > be closed after processing a response. > Seems like this is a bug introduced with SPARK-27651 and SPARK-27677. > The older methods {{registerWithShuffleServer}} and {{fetchBlocks}} didn't > close the client. > cc [~attilapiros] [~vanzin] [~mridulm80] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32663) TransportClient getting closed when there are outstanding requests to the server
Chandni Singh created SPARK-32663: - Summary: TransportClient getting closed when there are outstanding requests to the server Key: SPARK-32663 URL: https://issues.apache.org/jira/browse/SPARK-32663 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 3.0.0 Reporter: Chandni Singh The implementation of {{removeBlocks}} and {{getHostLocalDirs}} in {{ExternalBlockStoreClient}} closes the client after processing a response in the callback. This is a cached client which will be re-used for other responses. There could be other outstanding request to the shuffle service, so it should not be closed after processing a response. Seems like this is a bug introduced with SPARK-27651 and SPARK-27677. The older methods {{registerWithShuffleServer}} and {{fetchBlocks}} didn't close the client. cc [~attilapiros] [~vanzin] [~mridul] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31689) ShuffleBlockFetchIterator keeps localBlocks in its memory even though it never uses it
Chandni Singh created SPARK-31689: - Summary: ShuffleBlockFetchIterator keeps localBlocks in its memory even though it never uses it Key: SPARK-31689 URL: https://issues.apache.org/jira/browse/SPARK-31689 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 2.4.5 Reporter: Chandni Singh The {{localBlocks}} is created and used in the {{initialize}} method of ShuffleBlockFetchIterator but is never used after that. It can be local to the initialize method instead of being a field in the {{ShuffleBlockFetchIterator}} instance. It hold son to memory until iterator instance is alive which is unnecessary. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30512) Use a dedicated boss event group loop in the netty pipeline for external shuffle service
[ https://issues.apache.org/jira/browse/SPARK-30512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017447#comment-17017447 ] Chandni Singh commented on SPARK-30512: --- [https://github.com/apache/spark/pull/27240] > Use a dedicated boss event group loop in the netty pipeline for external > shuffle service > > > Key: SPARK-30512 > URL: https://issues.apache.org/jira/browse/SPARK-30512 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Chandni Singh >Priority: Major > > We have been seeing a large number of SASL authentication (RPC requests) > timing out with the external shuffle service. > The issue and all the analysis we did is described here: > [https://github.com/netty/netty/issues/9890] > I added a {{LoggingHandler}} to netty pipeline and realized that even the > channel registration is delayed by 30 seconds. > In the Spark External Shuffle service, the boss event group and the worker > event group are same which is causing this delay. > {code:java} > EventLoopGroup bossGroup = > NettyUtils.createEventLoop(ioMode, conf.serverThreads(), > conf.getModuleName() + "-server"); > EventLoopGroup workerGroup = bossGroup; > bootstrap = new ServerBootstrap() > .group(bossGroup, workerGroup) > .channel(NettyUtils.getServerChannelClass(ioMode)) > .option(ChannelOption.ALLOCATOR, allocator) > .childOption(ChannelOption.ALLOCATOR, allocator); > {code} > When the load at the shuffle service increases, since the worker threads are > busy with existing channels, registering new channels gets delayed. > The fix is simple. I created a dedicated boss thread event loop group with 1 > thread. > {code:java} > EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1, > conf.getModuleName() + "-boss"); > EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, > conf.serverThreads(), > conf.getModuleName() + "-server"); > bootstrap = new ServerBootstrap() > .group(bossGroup, workerGroup) > .channel(NettyUtils.getServerChannelClass(ioMode)) > .option(ChannelOption.ALLOCATOR, allocator) > {code} > This fixed the issue. > We just need 1 thread in the boss group because there is only a single > server bootstrap. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-30512) Use a dedicated boss event group loop in the netty pipeline for external shuffle service
[ https://issues.apache.org/jira/browse/SPARK-30512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-30512: -- Comment: was deleted (was: [https://github.com/apache/spark/pull/27240]) > Use a dedicated boss event group loop in the netty pipeline for external > shuffle service > > > Key: SPARK-30512 > URL: https://issues.apache.org/jira/browse/SPARK-30512 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Chandni Singh >Priority: Major > > We have been seeing a large number of SASL authentication (RPC requests) > timing out with the external shuffle service. > The issue and all the analysis we did is described here: > [https://github.com/netty/netty/issues/9890] > I added a {{LoggingHandler}} to netty pipeline and realized that even the > channel registration is delayed by 30 seconds. > In the Spark External Shuffle service, the boss event group and the worker > event group are same which is causing this delay. > {code:java} > EventLoopGroup bossGroup = > NettyUtils.createEventLoop(ioMode, conf.serverThreads(), > conf.getModuleName() + "-server"); > EventLoopGroup workerGroup = bossGroup; > bootstrap = new ServerBootstrap() > .group(bossGroup, workerGroup) > .channel(NettyUtils.getServerChannelClass(ioMode)) > .option(ChannelOption.ALLOCATOR, allocator) > .childOption(ChannelOption.ALLOCATOR, allocator); > {code} > When the load at the shuffle service increases, since the worker threads are > busy with existing channels, registering new channels gets delayed. > The fix is simple. I created a dedicated boss thread event loop group with 1 > thread. > {code:java} > EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1, > conf.getModuleName() + "-boss"); > EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, > conf.serverThreads(), > conf.getModuleName() + "-server"); > bootstrap = new ServerBootstrap() > .group(bossGroup, workerGroup) > .channel(NettyUtils.getServerChannelClass(ioMode)) > .option(ChannelOption.ALLOCATOR, allocator) > {code} > This fixed the issue. > We just need 1 thread in the boss group because there is only a single > server bootstrap. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30512) Use a dedicated boss event group loop in the netty pipeline for external shuffle service
[ https://issues.apache.org/jira/browse/SPARK-30512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17015362#comment-17015362 ] Chandni Singh commented on SPARK-30512: --- Please assign the issue to me so I can open up a PR. > Use a dedicated boss event group loop in the netty pipeline for external > shuffle service > > > Key: SPARK-30512 > URL: https://issues.apache.org/jira/browse/SPARK-30512 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Chandni Singh >Priority: Major > > We have been seeing a large number of SASL authentication (RPC requests) > timing out with the external shuffle service. > The issue and all the analysis we did is described here: > [https://github.com/netty/netty/issues/9890] > I added a {{LoggingHandler}} to netty pipeline and realized that even the > channel registration is delayed by 30 seconds. > In the Spark External Shuffle service, the boss event group and the worker > event group are same which is causing this delay. > {code:java} > EventLoopGroup bossGroup = > NettyUtils.createEventLoop(ioMode, conf.serverThreads(), > conf.getModuleName() + "-server"); > EventLoopGroup workerGroup = bossGroup; > bootstrap = new ServerBootstrap() > .group(bossGroup, workerGroup) > .channel(NettyUtils.getServerChannelClass(ioMode)) > .option(ChannelOption.ALLOCATOR, allocator) > .childOption(ChannelOption.ALLOCATOR, allocator); > {code} > When the load at the shuffle service increases, since the worker threads are > busy with existing channels, registering new channels gets delayed. > The fix is simple. I created a dedicated boss thread event loop group with 1 > thread. > {code:java} > EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1, > conf.getModuleName() + "-boss"); > EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, > conf.serverThreads(), > conf.getModuleName() + "-server"); > bootstrap = new ServerBootstrap() > .group(bossGroup, workerGroup) > .channel(NettyUtils.getServerChannelClass(ioMode)) > .option(ChannelOption.ALLOCATOR, allocator) > {code} > This fixed the issue. > We just need 1 thread in the boss group because there is only a single > server bootstrap. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30512) Use a dedicated boss event group loop in the netty pipeline for external shuffle service
[ https://issues.apache.org/jira/browse/SPARK-30512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandni Singh updated SPARK-30512: -- Description: We have been seeing a large number of SASL authentication (RPC requests) timing out with the external shuffle service. The issue and all the analysis we did is described here: [https://github.com/netty/netty/issues/9890] I added a {{LoggingHandler}} to netty pipeline and realized that even the channel registration is delayed by 30 seconds. In the Spark External Shuffle service, the boss event group and the worker event group are same which is causing this delay. {code:java} EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) .childOption(ChannelOption.ALLOCATOR, allocator); {code} When the load at the shuffle service increases, since the worker threads are busy with existing channels, registering new channels gets delayed. The fix is simple. I created a dedicated boss thread event loop group with 1 thread. {code:java} EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1, conf.getModuleName() + "-boss"); EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) {code} This fixed the issue. We just need 1 thread in the boss group because there is only a single server bootstrap. was: We have been seeing a large number of SASL authentication (RPC requests) timing out with the external shuffle service. The issue and all the analysis we did is described here: [https://github.com/netty/netty/issues/9890] I added a {{LoggingHandler}} to netty pipeline and realized that even the channel registration is delayed by 30 seconds. In the Spark External Shuffle service, the boss event group and the worker event group are same which is causing this delay. {code:java} EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) .childOption(ChannelOption.ALLOCATOR, allocator); {code} When the load at the shuffle service increases, since the worker threads are busy with existing channels, registering new channels gets delayed. The fix is simple. I created a dedicated boss thread event loop group with 1 thread. {code:java} EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1, conf.getModuleName() + "-boss"); EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) {code} This fixed the issue. We just need 1 thread in the boss group because there is only a single server bootstrap. Please assign the issue to me so I can open up a PR. > Use a dedicated boss event group loop in the netty pipeline for external > shuffle service > > > Key: SPARK-30512 > URL: https://issues.apache.org/jira/browse/SPARK-30512 > Project: Spark > Issue Type: Bug > Components: Shuffle >Affects Versions: 3.0.0 >Reporter: Chandni Singh >Priority: Major > > We have been seeing a large number of SASL authentication (RPC requests) > timing out with the external shuffle service. > The issue and all the analysis we did is described here: > [https://github.com/netty/netty/issues/9890] > I added a {{LoggingHandler}} to netty pipeline and realized that even the > channel registration is delayed by 30 seconds. > In the Spark External Shuffle service, the boss event group and the worker > event group are same which is causing this delay. > {code:java} > EventLoopGroup bossGroup = > NettyUtils.createEventLoop(ioMode, conf.serverThreads(), > conf.getModuleName() + "-server"); > EventLoopGroup workerGroup = bossGroup; > bootstrap = new ServerBootstrap() > .group(bossGroup, workerGroup) > .channel(NettyUtils.getServerChannelClass(ioMode)) >
[jira] [Created] (SPARK-30512) Use a dedicated boss event group loop in the netty pipeline for external shuffle service
Chandni Singh created SPARK-30512: - Summary: Use a dedicated boss event group loop in the netty pipeline for external shuffle service Key: SPARK-30512 URL: https://issues.apache.org/jira/browse/SPARK-30512 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 3.0.0 Reporter: Chandni Singh We have been seeing a large number of SASL authentication (RPC requests) timing out with the external shuffle service. The issue and all the analysis we did is described here: [https://github.com/netty/netty/issues/9890] I added a {{LoggingHandler}} to netty pipeline and realized that even the channel registration is delayed by 30 seconds. In the Spark External Shuffle service, the boss event group and the worker event group are same which is causing this delay. {code:java} EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) .childOption(ChannelOption.ALLOCATOR, allocator); {code} When the load at the shuffle service increases, since the worker threads are busy with existing channels, registering new channels gets delayed. The fix is simple. I created a dedicated boss thread event loop group with 1 thread. {code:java} EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1, conf.getModuleName() + "-boss"); EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) {code} This fixed the issue. We just need 1 thread in the boss group because there is only a single server bootstrap. Please assign the issue to me so I can open up a PR. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org