This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e6be3002fe8 [SPARK-41376][CORE] Correct the Netty preferDirectBufs 
check logic on executor start
e6be3002fe8 is described below

commit e6be3002fe8fbba6b29783363c91dcb0982c4ddb
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Wed Dec 7 18:15:21 2022 -0600

    [SPARK-41376][CORE] Correct the Netty preferDirectBufs check logic on 
executor start
    
    ### What changes were proposed in this pull request?
    
    Fix the condition for judging Netty prefer direct memory on executor start, 
the logic should match `org.apache.spark.network.client.TransportClientFactory`.
    
    ### Why are the changes needed?
    
    The check logical was added in SPARK-27991, the original intention is to 
avoid potential Netty OOM issue when Netty uses direct memory to consume 
shuffle data, but the condition is not sufficient, this PR updates the logic to 
match `org.apache.spark.network.client.TransportClientFactory`
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manual testing.
    
    Closes #38901 from pan3793/SPARK-41376.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../java/org/apache/spark/network/util/NettyUtils.java     | 14 ++++++++++++++
 .../spark/executor/CoarseGrainedExecutorBackend.scala      |  5 ++++-
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 4f070f02a12..cc4657efe39 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -179,4 +179,18 @@ public class NettyUtils {
       allowCache ? PooledByteBufAllocator.defaultUseCacheForAllThreads() : 
false
     );
   }
+
+  /**
+   * ByteBuf allocator prefers to allocate direct ByteBuf iif both Spark 
allows to create direct
+   * ByteBuf and Netty enables directBufferPreferred.
+   */
+  public static boolean preferDirectBufs(TransportConf conf) {
+    boolean allowDirectBufs;
+    if (conf.sharedByteBufAllocators()) {
+      allowDirectBufs = conf.preferDirectBufsForSharedByteBufAllocators();
+    } else {
+      allowDirectBufs = conf.preferDirectBufs();
+    }
+    return allowDirectBufs && PlatformDependent.directBufferPreferred();
+  }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index a94e63656e1..4903421f906 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -35,6 +35,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.util.NettyUtils
 import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.resource.ResourceProfile._
@@ -85,7 +87,8 @@ private[spark] class CoarseGrainedExecutorBackend(
 
     logInfo("Connecting to driver: " + driverUrl)
     try {
-      if (PlatformDependent.directBufferPreferred() &&
+      val shuffleClientTransportConf = 
SparkTransportConf.fromSparkConf(env.conf, "shuffle")
+      if (NettyUtils.preferDirectBufs(shuffleClientTransportConf) &&
           PlatformDependent.maxDirectMemory() < 
env.conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)) {
         throw new SparkException(s"Netty direct memory should at least be 
bigger than " +
           s"'${MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key}', but got " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to