This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 999473b [SPARK-36705][SHUFFLE] Disable push based shuffle when IO encryption is enabled or serializer is not relocatable 999473b is described below commit 999473b1a5bad4ae2ae345df8abf018100c9d918 Author: Minchu Yang <miny...@minyang-mn3.linkedin.biz> AuthorDate: Mon Sep 13 16:14:35 2021 -0500 [SPARK-36705][SHUFFLE] Disable push based shuffle when IO encryption is enabled or serializer is not relocatable ### What changes were proposed in this pull request? Disable push-based shuffle when IO encryption is enabled or serializer does not support relocation of serialized objects. ### Why are the changes needed? Push based shuffle is not compatible with IO encryption or non-relocatable serialization. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added some tests to check whether push-based shuffle can be disabled successfully when IO encryption is enabled or a serializer that does not support relocation of serialized object is used. Closes #33976 from rmcyang/SPARK-36705. Authored-by: Minchu Yang <miny...@minyang-mn3.linkedin.biz> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../main/scala/org/apache/spark/util/Utils.scala | 30 +++++++++++++++++----- .../scala/org/apache/spark/util/UtilsSuite.scala | 11 ++++++-- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5bbb4790..bbff56c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -70,7 +70,7 @@ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.JavaUtils -import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} +import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} import org.apache.spark.status.api.v1.{StackTrace, ThreadStackTrace} import org.apache.spark.util.io.ChunkedByteBufferOutputStream @@ -2597,14 +2597,30 @@ private[spark] object Utils extends Logging { } /** - * Push based shuffle can only be enabled when the application is submitted - * to run in YARN mode, with external shuffle service enabled + * Push based shuffle can only be enabled when below conditions are met: + * - the application is submitted to run in YARN mode + * - external shuffle service enabled + * - IO encryption disabled + * - serializer(such as KryoSerializer) supports relocation of serialized objects */ def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = { - conf.get(PUSH_BASED_SHUFFLE_ENABLED) && - (conf.get(IS_TESTING).getOrElse(false) || - (conf.get(SHUFFLE_SERVICE_ENABLED) && - conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn")) + val serializer = Utils.classForName(conf.get(SERIALIZER)).getConstructor(classOf[SparkConf]) + .newInstance(conf).asInstanceOf[Serializer] + val canDoPushBasedShuffle = + conf.get(PUSH_BASED_SHUFFLE_ENABLED) && + (conf.get(IS_TESTING).getOrElse(false) || + (conf.get(SHUFFLE_SERVICE_ENABLED) && + conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn" && + // TODO: [SPARK-36744] needs to support IO encryption for push-based shuffle + !conf.get(IO_ENCRYPTION_ENABLED) && + serializer.supportsRelocationOfSerializedObjects)) + + if (!canDoPushBasedShuffle) { + logWarning("Push-based shuffle can only be enabled when the application is submitted" + + "to run in YARN mode, with external shuffle service enabled, IO encryption disabled, and" + + "relocation of serialized objects supported.") + } + canDoPushBasedShuffle } /** diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index c1b7b5f..a4df5cd 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1509,10 +1509,17 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.isPushBasedShuffleEnabled(conf) === false) conf.set(SHUFFLE_SERVICE_ENABLED, true) conf.set(SparkLauncher.SPARK_MASTER, "yarn") - conf.set("spark.yarn.maxAttempts", "1") + conf.set("spark.yarn.maxAppAttempts", "1") + conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") assert(Utils.isPushBasedShuffleEnabled(conf) === true) - conf.set("spark.yarn.maxAttempts", "2") + conf.set("spark.yarn.maxAppAttempts", "2") assert(Utils.isPushBasedShuffleEnabled(conf) === true) + conf.set(IO_ENCRYPTION_ENABLED, true) + assert(Utils.isPushBasedShuffleEnabled(conf) === false) + conf.set(IO_ENCRYPTION_ENABLED, false) + assert(Utils.isPushBasedShuffleEnabled(conf) === true) + conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer") + assert(Utils.isPushBasedShuffleEnabled(conf) === false) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org