This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e5b5b7e  [SPARK-32175][CORE] Fix the order between initialization for 
ExecutorPlugin and starting heartbeat thread
e5b5b7e is described below

commit e5b5b7e507ab974bbca3abb0bbf56bf67696d53e
Author: Kousuke Saruta <saru...@oss.nttdata.com>
AuthorDate: Wed Jul 29 08:44:56 2020 -0500

    [SPARK-32175][CORE] Fix the order between initialization for ExecutorPlugin 
and starting heartbeat thread
    
    ### What changes were proposed in this pull request?
    
    This PR changes the order between initialization for ExecutorPlugin and 
starting heartbeat thread in Executor.
    
    ### Why are the changes needed?
    
    In the current master, heartbeat thread in a executor starts after plugin 
initialization so if the initialization takes long time, heartbeat is not sent 
to driver and the executor will be removed from cluster.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Plugins for executors will be allowed to take long time for 
initialization.
    
    ### How was this patch tested?
    
    New testcase.
    
    Closes #29002 from sarutak/fix-heartbeat-issue.
    
    Authored-by: Kousuke Saruta <saru...@oss.nttdata.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
    (cherry picked from commit 9be088357eff4328248b29a3a49a816756745345)
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../main/scala/org/apache/spark/TestUtils.scala    | 15 ++++-
 .../scala/org/apache/spark/executor/Executor.scala | 12 ++--
 .../org/apache/spark/executor/ExecutorSuite.scala  | 72 +++++++++++++++++++++-
 3 files changed, 89 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index d459627..1e00769 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -179,11 +179,20 @@ private[spark] object TestUtils {
       destDir: File,
       toStringValue: String = "",
       baseClass: String = null,
-      classpathUrls: Seq[URL] = Seq.empty): File = {
+      classpathUrls: Seq[URL] = Seq.empty,
+      implementsClasses: Seq[String] = Seq.empty,
+      extraCodeBody: String = ""): File = {
     val extendsText = Option(baseClass).map { c => s" extends ${c}" 
}.getOrElse("")
+    val implementsText =
+      "implements " + (implementsClasses :+ 
"java.io.Serializable").mkString(", ")
     val sourceFile = new JavaSourceFromString(className,
-      "public class " + className + extendsText + " implements 
java.io.Serializable {" +
-      "  @Override public String toString() { return \"" + toStringValue + 
"\"; }}")
+      s"""
+         |public class $className $extendsText $implementsText {
+         |  @Override public String toString() { return "$toStringValue"; }
+         |
+         |  $extraCodeBody
+         |}
+        """.stripMargin)
     createCompiledClass(className, destDir, sourceFile, classpathUrls)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 8aeb16f..e9f1d9c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -153,11 +153,6 @@ private[spark] class Executor(
   // for fetching remote cached RDD blocks, so need to make sure it uses the 
right classloader too.
   env.serializerManager.setDefaultClassLoader(replClassLoader)
 
-  // Plugins need to load using a class loader that includes the executor's 
user classpath
-  private val plugins: Option[PluginContainer] = 
Utils.withContextClassLoader(replClassLoader) {
-    PluginContainer(env, resources.asJava)
-  }
-
   // Max size of direct result. If task result is bigger than this, we use the 
block manager
   // to send the result back.
   private val maxDirectResultSize = Math.min(
@@ -218,6 +213,13 @@ private[spark] class Executor(
 
   heartbeater.start()
 
+  // Plugins need to load using a class loader that includes the executor's 
user classpath.
+  // Plugins also needs to be initialized after the heartbeater started
+  // to avoid blocking to send heartbeat (see SPARK-32175).
+  private val plugins: Option[PluginContainer] = 
Utils.withContextClassLoader(replClassLoader) {
+    PluginContainer(env, resources.asJava)
+  }
+
   metricsPoller.start()
 
   private[executor] def numRunningTasks: Int = runningTasks.size()
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 31049d1..b198448 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.executor
 
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.io.{Externalizable, File, ObjectInput, ObjectOutput}
 import java.lang.Thread.UncaughtExceptionHandler
 import java.nio.ByteBuffer
 import java.util.Properties
@@ -41,6 +41,7 @@ import org.scalatestplus.mockito.MockitoSugar
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.{SimpleApplicationTest, SparkSubmitSuite}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.memory.TestMemoryManager
@@ -52,7 +53,7 @@ import org.apache.spark.scheduler.{DirectTaskResult, 
FakeTask, ResultTask, Task,
 import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, 
SerializerManager}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{BlockManager, BlockManagerId}
-import org.apache.spark.util.{LongAccumulator, UninterruptibleThread}
+import org.apache.spark.util.{LongAccumulator, UninterruptibleThread, Utils}
 
 class ExecutorSuite extends SparkFunSuite
     with LocalSparkContext with MockitoSugar with Eventually with 
PrivateMethodTester {
@@ -402,6 +403,73 @@ class ExecutorSuite extends SparkFunSuite
     assert(taskMetrics.getMetricValue("JVMHeapMemory") > 0)
   }
 
+  test("SPARK-32175: Plugin initialization should start after heartbeater 
started") {
+    withTempDir { tempDir =>
+      val sparkPluginCodeBody =
+        """
+          |@Override
+          |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() {
+          |  return new TestExecutorPlugin();
+          |}
+          |
+          |@Override
+          |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { 
return null; }
+        """.stripMargin
+      val executorPluginBody =
+        """
+          |@Override
+          |public void init(
+          |    org.apache.spark.api.plugin.PluginContext ctx,
+          |    java.util.Map<String, String> extraConf) {
+          |  try {
+          |    Thread.sleep(8 * 1000);
+          |  } catch (InterruptedException e) {
+          |    throw new RuntimeException(e);
+          |  }
+          |}
+        """.stripMargin
+
+      val compiledExecutorPlugin = TestUtils.createCompiledClass(
+        "TestExecutorPlugin",
+        tempDir,
+        "",
+        null,
+        Seq.empty,
+        Seq("org.apache.spark.api.plugin.ExecutorPlugin"),
+        executorPluginBody)
+
+      val thisClassPath =
+        sys.props("java.class.path").split(File.pathSeparator).map(p => new 
File(p).toURI.toURL)
+      val compiledSparkPlugin = TestUtils.createCompiledClass(
+        "TestSparkPlugin",
+        tempDir,
+        "",
+        null,
+        Seq(tempDir.toURI.toURL) ++ thisClassPath,
+        Seq("org.apache.spark.api.plugin.SparkPlugin"),
+        sparkPluginCodeBody)
+
+      val jarUrl = TestUtils.createJar(
+        Seq(compiledSparkPlugin, compiledExecutorPlugin),
+        new File(tempDir, "testPlugin.jar"))
+
+      val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+      val args = Seq(
+        "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+        "--name", "testApp",
+        "--master", "local-cluster[1,1,1024]",
+        "--conf", "spark.plugins=TestSparkPlugin",
+        "--conf", "spark.storage.blockManagerSlaveTimeoutMs=" + 5 * 1000,
+        "--conf", "spark.network.timeoutInterval=" + 1000,
+        "--conf", "spark.executor.heartbeatInterval=" + 1000,
+        "--conf", "spark.executor.extraClassPath=" + jarUrl.toString,
+        "--conf", "spark.driver.extraClassPath=" + jarUrl.toString,
+        "--conf", "spark.ui.enabled=false",
+        unusedJar.toString)
+      SparkSubmitSuite.runSparkSubmit(args, timeout = 30.seconds)
+    }
+  }
+
   private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): 
SparkEnv = {
     val mockEnv = mock[SparkEnv]
     val mockRpcEnv = mock[RpcEnv]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to