Repository: spark
Updated Branches:
  refs/heads/master a28146568 -> 9362c5cc2


[SPARK-25449][CORE] Heartbeat shouldn't include accumulators for zero metrics

## What changes were proposed in this pull request?

Heartbeat shouldn't include accumulators for zero metrics.

Heartbeats sent from executors to the driver every 10 seconds contain metrics 
and are generally on the order of a few KBs. However, for large jobs with lots 
of tasks, heartbeats can be on the order of tens of MBs, causing tasks to die 
with heartbeat failures. We can mitigate this by not sending zero metrics to 
the driver.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #22473 from mukulmurthy/25449-heartbeat.

Authored-by: Mukul Murthy <mukul.mur...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9362c5cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9362c5cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9362c5cc

Branch: refs/heads/master
Commit: 9362c5cc273fdd09f9b3b512e2f6b64bcefc25ab
Parents: a281465
Author: Mukul Murthy <mukul.mur...@gmail.com>
Authored: Fri Sep 28 16:34:17 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Sep 28 16:34:17 2018 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  11 +-
 .../scala/org/apache/spark/SparkContext.scala   |   2 +-
 .../org/apache/spark/executor/Executor.scala    |  40 +++++--
 .../apache/spark/internal/config/package.scala  |  14 +++
 .../apache/spark/executor/ExecutorSuite.scala   | 111 +++++++++++++++++--
 .../MesosCoarseGrainedSchedulerBackend.scala    |   3 +-
 6 files changed, 154 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e0f98f1..81aa31d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -609,13 +609,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
     require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
       s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
 
-    val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", 
"120s")
-    val executorHeartbeatInterval = 
getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
+    val executorTimeoutThresholdMs =
+      getTimeAsSeconds("spark.network.timeout", "120s") * 1000
+    val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
     // If spark.executor.heartbeatInterval bigger than spark.network.timeout,
     // it will almost always cause ExecutorLostFailure. See SPARK-22754.
-    require(executorTimeoutThreshold > executorHeartbeatInterval, "The value 
of " +
-      s"spark.network.timeout=${executorTimeoutThreshold}s must be no less 
than the value of " +
-      s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.")
+    require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The 
value of " +
+      s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less 
than the value of " +
+      s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d943087..0a66dae 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -499,7 +499,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
     // create and start the heartbeater for collecting memory metrics
     _heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, 
"driver-heartbeater",
-      conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
+      conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
     _heartbeater.start()
 
     // start TaskScheduler after taskScheduler sets DAGScheduler reference in 
DAGScheduler's

http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index eba708d..61deb54 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
+import scala.concurrent.duration._
 import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -120,7 +121,7 @@ private[spark] class Executor(
   }
 
   // Whether to load classes in user jars before those in Spark jars
-  private val userClassPathFirst = 
conf.getBoolean("spark.executor.userClassPathFirst", false)
+  private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)
 
   // Whether to monitor killed / interrupted tasks
   private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", 
false)
@@ -170,22 +171,33 @@ private[spark] class Executor(
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
+  /**
+   * When an executor is unable to send heartbeats to the driver more than 
`HEARTBEAT_MAX_FAILURES`
+   * times, it should kill itself. The default value is 60. It means we will 
retry to send
+   * heartbeats about 10 minutes because the heartbeat interval is 10s.
+   */
+  private val HEARTBEAT_MAX_FAILURES = 
conf.get(EXECUTOR_HEARTBEAT_MAX_FAILURES)
+
+  /**
+   * Whether to drop empty accumulators from heartbeats sent to the driver. 
Including the empty
+   * accumulators (that satisfy isZero) can make the size of the heartbeat 
message very large.
+   */
+  private val HEARTBEAT_DROP_ZEROES = 
conf.get(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES)
+
+  /**
+   * Interval to send heartbeats, in milliseconds
+   */
+  private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
+
   // Executor for the heartbeat task.
   private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
-    "executor-heartbeater", 
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
+    "executor-heartbeater", HEARTBEAT_INTERVAL_MS)
 
   // must be initialized before running startDriverHeartbeat()
   private val heartbeatReceiverRef =
     RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
 
   /**
-   * When an executor is unable to send heartbeats to the driver more than 
`HEARTBEAT_MAX_FAILURES`
-   * times, it should kill itself. The default value is 60. It means we will 
retry to send
-   * heartbeats about 10 minutes because the heartbeat interval is 10s.
-   */
-  private val HEARTBEAT_MAX_FAILURES = 
conf.getInt("spark.executor.heartbeat.maxFailures", 60)
-
-  /**
    * Count the failure times of heartbeat. It should only be accessed in the 
heartbeat thread. Each
    * successful heartbeat will reset it to 0.
    */
@@ -834,7 +846,13 @@ private[spark] class Executor(
       if (taskRunner.task != null) {
         taskRunner.task.metrics.mergeShuffleReadMetrics()
         taskRunner.task.metrics.setJvmGCTime(curGCTime - 
taskRunner.startGCTime)
-        accumUpdates += ((taskRunner.taskId, 
taskRunner.task.metrics.accumulators()))
+        val accumulatorsToReport =
+          if (HEARTBEAT_DROP_ZEROES) {
+            taskRunner.task.metrics.accumulators().filterNot(_.isZero)
+          } else {
+            taskRunner.task.metrics.accumulators()
+          }
+        accumUpdates += ((taskRunner.taskId, accumulatorsToReport))
       }
     }
 
@@ -842,7 +860,7 @@ private[spark] class Executor(
       executorUpdates)
     try {
       val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
-          message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
+        message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, 
EXECUTOR_HEARTBEAT_INTERVAL.key))
       if (response.reregisterBlockManager) {
         logInfo("Told to re-register on heartbeat")
         env.blockManager.reregister()

http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7f63422..e8b1d88 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -83,6 +83,20 @@ package object config {
   private[spark] val EXECUTOR_CLASS_PATH =
     
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
 
+  private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES =
+    ConfigBuilder("spark.executor.heartbeat.dropZeroAccumulatorUpdates")
+      .internal()
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val EXECUTOR_HEARTBEAT_INTERVAL =
+    ConfigBuilder("spark.executor.heartbeatInterval")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("10s")
+
+  private[spark] val EXECUTOR_HEARTBEAT_MAX_FAILURES =
+    
ConfigBuilder("spark.executor.heartbeat.maxFailures").internal().intConf.createWithDefault(60)
+
   private[spark] val EXECUTOR_JAVA_OPTIONS =
     
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 77a7668..1f8a657 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -21,9 +21,10 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
 import java.lang.Thread.UncaughtExceptionHandler
 import java.nio.ByteBuffer
 import java.util.Properties
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.Map
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -33,22 +34,25 @@ import org.mockito.Matchers.{any, eq => meq}
 import org.mockito.Mockito.{inOrder, verify, when}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
+import org.scalatest.PrivateMethodTester
 import org.scalatest.concurrent.Eventually
 import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.memory.MemoryManager
+import org.apache.spark.internal.config._
+import org.apache.spark.memory.TestMemoryManager
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rdd.RDD
-import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription}
+import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcTimeout}
+import org.apache.spark.scheduler.{FakeTask, ResultTask, Task, TaskDescription}
 import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
 import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.storage.{BlockManager, BlockManagerId}
+import org.apache.spark.util.{LongAccumulator, UninterruptibleThread}
 
-class ExecutorSuite extends SparkFunSuite with LocalSparkContext with 
MockitoSugar with Eventually {
+class ExecutorSuite extends SparkFunSuite
+    with LocalSparkContext with MockitoSugar with Eventually with 
PrivateMethodTester {
 
   test("SPARK-15963: Catch `TaskKilledException` correctly in 
Executor.TaskRunner") {
     // mock some objects to make Executor.launchTask() happy
@@ -252,18 +256,107 @@ class ExecutorSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSug
     }
   }
 
+  test("Heartbeat should drop zero accumulator updates") {
+    heartbeatZeroAccumulatorUpdateTest(true)
+  }
+
+  test("Heartbeat should not drop zero accumulator updates when the conf is 
disabled") {
+    heartbeatZeroAccumulatorUpdateTest(false)
+  }
+
+  private def withHeartbeatExecutor(confs: (String, String)*)
+      (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
+    val conf = new SparkConf
+    confs.foreach { case (k, v) => conf.set(k, v) }
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+    val executor =
+      new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, 
isLocal = true)
+    val executorClass = classOf[Executor]
+
+    // Save all heartbeats sent into an ArrayBuffer for verification
+    val heartbeats = ArrayBuffer[Heartbeat]()
+    val mockReceiver = mock[RpcEndpointRef]
+    when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
+      .thenAnswer(new Answer[HeartbeatResponse] {
+        override def answer(invocation: InvocationOnMock): HeartbeatResponse = 
{
+          val args = invocation.getArguments()
+          val mock = invocation.getMock
+          heartbeats += args(0).asInstanceOf[Heartbeat]
+          HeartbeatResponse(false)
+        }
+      })
+    val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
+    receiverRef.setAccessible(true)
+    receiverRef.set(executor, mockReceiver)
+
+    f(executor, heartbeats)
+  }
+
+  private def heartbeatZeroAccumulatorUpdateTest(dropZeroMetrics: Boolean): 
Unit = {
+    val c = EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key -> 
dropZeroMetrics.toString
+    withHeartbeatExecutor(c) { (executor, heartbeats) =>
+      val reportHeartbeat = PrivateMethod[Unit]('reportHeartBeat)
+
+      // When no tasks are running, there should be no accumulators sent in 
heartbeat
+      executor.invokePrivate(reportHeartbeat())
+      // invokeReportHeartbeat(executor)
+      assert(heartbeats.length == 1)
+      assert(heartbeats(0).accumUpdates.length == 0,
+        "No updates should be sent when no tasks are running")
+
+      // When we start a task with a nonzero accumulator, that should end up 
in the heartbeat
+      val metrics = new TaskMetrics()
+      val nonZeroAccumulator = new LongAccumulator()
+      nonZeroAccumulator.add(1)
+      metrics.registerAccumulator(nonZeroAccumulator)
+
+      val executorClass = classOf[Executor]
+      val tasksMap = {
+        val field =
+          
executorClass.getDeclaredField("org$apache$spark$executor$Executor$$runningTasks")
+        field.setAccessible(true)
+        field.get(executor).asInstanceOf[ConcurrentHashMap[Long, 
executor.TaskRunner]]
+      }
+      val mockTaskRunner = mock[executor.TaskRunner]
+      val mockTask = mock[Task[Any]]
+      when(mockTask.metrics).thenReturn(metrics)
+      when(mockTaskRunner.taskId).thenReturn(6)
+      when(mockTaskRunner.task).thenReturn(mockTask)
+      when(mockTaskRunner.startGCTime).thenReturn(1)
+      tasksMap.put(6, mockTaskRunner)
+
+      executor.invokePrivate(reportHeartbeat())
+      assert(heartbeats.length == 2)
+      val updates = heartbeats(1).accumUpdates
+      assert(updates.length == 1 && updates(0)._1 == 6,
+        "Heartbeat should only send update for the one task running")
+      val accumsSent = updates(0)._2.length
+      assert(accumsSent > 0, "The nonzero accumulator we added should be sent")
+      if (dropZeroMetrics) {
+        assert(accumsSent == metrics.accumulators().count(!_.isZero),
+          "The number of accumulators sent should match the number of nonzero 
accumulators")
+      } else {
+        assert(accumsSent == metrics.accumulators().length,
+          "The number of accumulators sent should match the number of total 
accumulators")
+      }
+    }
+  }
+
   private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): 
SparkEnv = {
     val mockEnv = mock[SparkEnv]
     val mockRpcEnv = mock[RpcEnv]
     val mockMetricsSystem = mock[MetricsSystem]
-    val mockMemoryManager = mock[MemoryManager]
+    val mockBlockManager = mock[BlockManager]
     when(mockEnv.conf).thenReturn(conf)
     when(mockEnv.serializer).thenReturn(serializer)
     when(mockEnv.serializerManager).thenReturn(mock[SerializerManager])
     when(mockEnv.rpcEnv).thenReturn(mockRpcEnv)
     when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
-    when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
+    when(mockEnv.memoryManager).thenReturn(new TestMemoryManager(conf))
     when(mockEnv.closureSerializer).thenReturn(serializer)
+    when(mockBlockManager.blockManagerId).thenReturn(BlockManagerId("1", 
"hostA", 1234))
+    when(mockEnv.blockManager).thenReturn(mockBlockManager)
     SparkEnv.set(mockEnv)
     mockEnv
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9362c5cc/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 178de30..bac0246 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver
 import org.apache.spark.{SecurityManager, SparkConf, SparkContext, 
SparkException, TaskState}
 import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             externalShufflePort,
             sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
               s"${sc.conf.getTimeAsSeconds("spark.network.timeout", 
"120s")}s"),
-            sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
+            sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
         slave.shuffleRegistered = true
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to