This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0446363  [SPARK-26660] Add warning logs when broadcasting large task 
binary
0446363 is described below

commit 0446363ef466922a42de019ca14fada62959c1f3
Author: Liupengcheng <liupengch...@xiaomi.com>
AuthorDate: Wed Jan 23 08:51:39 2019 -0600

    [SPARK-26660] Add warning logs when broadcasting large task binary
    
    ## What changes were proposed in this pull request?
    
    Currently, some ML library may generate large ml model, which may be 
referenced in the task closure, so driver will broadcasting large task binary, 
and executor may not able to deserialize it and result in OOM failures(for 
instance, executor's memory is not enough). This problem not only affects apps 
using ml library, some user specified closure or function which refers large 
data may also have this problem.
    
    In order to facilitate the debuging of memory problem caused by large 
taskBinary broadcast, we can add same warning logs for it.
    
    This PR will add some warning logs on the driver side when broadcasting a 
large task binary, and it also included some minor log changes in the reading 
of broadcast.
    
    ## How was this patch tested?
    NA-Just log changes.
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #23580 from liupc/Add-warning-logs-for-large-taskBinary-size.
    
    Authored-by: Liupengcheng <liupengch...@xiaomi.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala | 4 +++-
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala     | 4 ++++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 6410866..7680587 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -236,7 +236,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, 
id: Long)
               throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
             }
           case None =>
-            logInfo("Started reading broadcast variable " + id)
+            val estimatedTotalSize = Utils.bytesToString(numBlocks * blockSize)
+            logInfo(s"Started reading broadcast variable $id with $numBlocks 
pieces " +
+              s"(estimated total size $estimatedTotalSize)")
             val startTimeMs = System.currentTimeMillis()
             val blocks = readBlocks()
             logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f6ade18..ecb8ac0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1162,6 +1162,10 @@ private[spark] class DAGScheduler(
         partitions = stage.rdd.partitions
       }
 
+      if (taskBinaryBytes.length * 1000 > TaskSetManager.TASK_SIZE_TO_WARN_KB) 
{
+        logWarning(s"Broadcasting large task binary with size " +
+          s"${Utils.bytesToString(taskBinaryBytes.length)}")
+      }
       taskBinary = sc.broadcast(taskBinaryBytes)
     } catch {
       // In the case of a failure during serialization, abort the stage.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to