This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new f27c951  [SPARK-27198][CORE] Heartbeat interval mismatch in driver and 
executor
f27c951 is described below

commit f27c9514ca5560acf652f944821320424a6f3860
Author: Ajith <ajith2...@gmail.com>
AuthorDate: Mon Mar 25 15:38:07 2019 -0500

    [SPARK-27198][CORE] Heartbeat interval mismatch in driver and executor
    
    ## What changes were proposed in this pull request?
    
    When heartbeat interval is configured via spark.executor.heartbeatInterval 
without specifying units, we have time mismatched between driver(considers in 
seconds) and executor(considers as milliseconds)
    
    ## How was this patch tested?
    
    Will add UTs
    
    Closes #24140 from ajithme/intervalissue.
    
    Authored-by: Ajith <ajith2...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 core/src/main/scala/org/apache/spark/SparkConf.scala         | 4 +++-
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 5 ++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 6c4c5c9..854dfbd 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.LinkedHashSet
+import scala.concurrent.duration._
 
 import org.apache.avro.{Schema, SchemaNormalization}
 
@@ -610,7 +611,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
       s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
 
     val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", 
"120s")
-    val executorHeartbeatInterval = 
getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
+    val executorHeartbeatInterval =
+      getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds
     // If spark.executor.heartbeatInterval bigger than spark.network.timeout,
     // it will almost always cause ExecutorLostFailure. See SPARK-22754.
     require(executorTimeoutThreshold > executorHeartbeatInterval, "The value 
of " +
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 97dfcc4..9ac8063 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
+import scala.concurrent.duration._
 import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -831,9 +832,11 @@ private[spark] class Executor(
     }
 
     val message = Heartbeat(executorId, accumUpdates.toArray, 
env.blockManager.blockManagerId)
+    val heartbeatIntervalInSec =
+      conf.getTimeAsMs("spark.executor.heartbeatInterval", 
"10s").millis.toSeconds.seconds
     try {
       val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
-          message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
+          message, new RpcTimeout(heartbeatIntervalInSec, 
"spark.executor.heartbeatInterval"))
       if (response.reregisterBlockManager) {
         logInfo("Told to re-register on heartbeat")
         env.blockManager.reregister()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to