This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 3502fda  [SPARK-36774][CORE][TESTS] Move SparkSubmitTestUtils to core 
module and use it in SparkSubmitSuite
3502fda is described below

commit 3502fda78395a92ef6320431cc9bb78569d7d796
Author: Josh Rosen <joshro...@databricks.com>
AuthorDate: Thu Sep 16 14:28:47 2021 -0700

    [SPARK-36774][CORE][TESTS] Move SparkSubmitTestUtils to core module and use 
it in SparkSubmitSuite
    
    ### What changes were proposed in this pull request?
    
    This PR refactors test code in order to improve the debugability of 
`SparkSubmitSuite`.
    
    The `sql/hive` module contains a `SparkSubmitTestUtils` helper class which 
launches `spark-submit` and captures its output in order to display better 
error messages when tests fail. This helper is currently used by 
`HiveSparkSubmitSuite` and `HiveExternalCatalogVersionsSuite`, but isn't used 
by `SparkSubmitSuite`.
    
    In this PR, I moved `SparkSubmitTestUtils` and `ProcessTestUtils` into the 
`core` module and updated `SparkSubmitSuite`, `BufferHolderSparkSubmitSuite`, 
and `WholestageCodegenSparkSubmitSuite` to use the relocated helper classes. 
This required me to change `SparkSubmitTestUtils` to make its timeouts 
configurable and to generalize its method for locating the `spark-submit` 
binary.
    
    ### Why are the changes needed?
    
    Previously, `SparkSubmitSuite` tests would fail with messages like:
    
    ```
    [info] - launch simple application with spark-submit *** FAILED *** (1 
second, 832 milliseconds)
    [info]   Process returned with exit code 101. See the log4j logs for more 
detail. (SparkSubmitSuite.scala:1551)
    [info]   org.scalatest.exceptions.TestFailedException:
    [info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
    ```
    
    which require the Spark developer to hunt in log4j logs in order to view 
the logs from the failed `spark-submit` command.
    
    After this change, those tests will fail with detailed error messages that 
include the text of failed command plus timestamped logs captured from the 
failed proces:
    
    ```
    [info] - launch simple application with spark-submit *** FAILED *** (2 
seconds, 800 milliseconds)
    [info]   spark-submit returned with exit code 101.
    [info]   Command line: '/Users/joshrosen/oss-spark/bin/spark-submit' 
'--class' 'invalidClassName' '--name' 'testApp' '--master' 'local' '--conf' 
'spark.ui.enabled=false' '--conf' 'spark.master.rest.enabled=false' 
'file:/Users/joshrosen/oss-spark/target/tmp/spark-0a8a0c93-3aaf-435d-9cf3-b97abd318d91/testJar-1631768004882.jar'
    [info]
    [info]   2021-09-15 21:53:26.041 - stderr> SLF4J: Class path contains 
multiple SLF4J bindings.
    [info]   2021-09-15 21:53:26.042 - stderr> SLF4J: Found binding in 
[jar:file:/Users/joshrosen/oss-spark/assembly/target/scala-2.12/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    [info]   2021-09-15 21:53:26.042 - stderr> SLF4J: Found binding in 
[jar:file:/Users/joshrosen/.m2/repository/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    [info]   2021-09-15 21:53:26.042 - stderr> SLF4J: See 
http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    [info]   2021-09-15 21:53:26.042 - stderr> SLF4J: Actual binding is of type 
[org.slf4j.impl.Log4jLoggerFactory]
    [info]   2021-09-15 21:53:26.619 - stderr> Error: Failed to load class 
invalidClassName. (SparkSubmitTestUtils.scala:97)
    [info]   org.scalatest.exceptions.TestFailedException:
    [info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I manually ran the affected test suites.
    
    Closes #34013 from JoshRosen/SPARK-36774-move-SparkSubmitTestUtils-to-core.
    
    Authored-by: Josh Rosen <joshro...@databricks.com>
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
    (cherry picked from commit 3ae6e6775beae8225f8cb7404bd1a2ea961dd339)
    Signed-off-by: Josh Rosen <joshro...@databricks.com>
---
 .../scala/org/apache/spark}/ProcessTestUtils.scala |  2 +-
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 41 +---------------------
 .../spark/deploy}/SparkSubmitTestUtils.scala       | 18 +++++-----
 .../codegen/BufferHolderSparkSubmitSuite.scala     |  8 ++---
 .../WholeStageCodegenSparkSubmitSuite.scala        |  8 ++---
 .../spark/sql/hive/thriftserver/CliSuite.scala     |  2 +-
 .../thriftserver/HiveThriftServer2Suites.scala     |  2 +-
 .../hive/HiveExternalCatalogVersionsSuite.scala    |  6 +++-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala      |  5 +++
 9 files changed, 32 insertions(+), 60 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala 
b/core/src/test/scala/org/apache/spark/ProcessTestUtils.scala
similarity index 97%
rename from 
sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala
rename to core/src/test/scala/org/apache/spark/ProcessTestUtils.scala
index df530d8..e85f5cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/ProcessTestUtils.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.test
+package org.apache.spark
 
 import java.io.{InputStream, IOException}
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index eade41a..c4e3d6ae 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -30,11 +30,8 @@ import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path}
 import org.scalatest.BeforeAndAfterEach
-import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
-import org.scalatest.time.Span
-import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.TestUtils
@@ -107,18 +104,12 @@ trait TestPrematureExit {
 // Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() 
sets a bunch
 // of properties that needed to be cleared after tests.
 class SparkSubmitSuite
-  extends SparkFunSuite
+  extends SparkSubmitTestUtils
   with Matchers
   with BeforeAndAfterEach
   with ResetSystemProperties
-  with TimeLimits
   with TestPrematureExit {
 
-  import SparkSubmitSuite._
-
-  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
-  implicit val defaultSignaler: Signaler = ThreadSignaler
-
   private val emptyIvySettings = File.createTempFile("ivy", ".xml")
   FileUtils.write(emptyIvySettings, "<ivysettings />", StandardCharsets.UTF_8)
 
@@ -1527,36 +1518,6 @@ class SparkSubmitSuite
   }
 }
 
-object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
-
-  // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
-  implicit val defaultSignaler: Signaler = ThreadSignaler
-
-  // NOTE: This is an expensive operation in terms of time (10 seconds+). Use 
sparingly.
-  def runSparkSubmit(args: Seq[String], root: String = "..", timeout: Span = 
1.minute): Unit = {
-    val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
-    val sparkSubmitFile = if (Utils.isWindows) {
-      new File(s"$root\\bin\\spark-submit.cmd")
-    } else {
-      new File(s"$root/bin/spark-submit")
-    }
-    val process = Utils.executeCommand(
-      Seq(sparkSubmitFile.getCanonicalPath) ++ args,
-      new File(sparkHome),
-      Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
-
-    try {
-      val exitCode = failAfter(timeout) { process.waitFor() }
-      if (exitCode != 0) {
-        fail(s"Process returned with exit code $exitCode. See the log4j logs 
for more detail.")
-      }
-    } finally {
-      // Ensure we still kill the process in case it timed out
-      process.destroy()
-    }
-  }
-}
-
 object JarCreationTest extends Logging {
   def main(args: Array[String]): Unit = {
     TestUtils.configTestLog4j("INFO")
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala
similarity index 89%
rename from 
sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
rename to core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala
index 889f81b..2ab2e17 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.hive
+package org.apache.spark.deploy
 
 import java.io.File
 import java.sql.Timestamp
@@ -25,33 +25,35 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.exceptions.TestFailedDueToTimeoutException
+import org.scalatest.time.Span
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.util.Utils
 
 trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits {
 
+  protected val defaultSparkSubmitTimeout: Span = 1.minute
+
   // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like 
ScalaTest 2.2.x
   implicit val defaultSignaler: Signaler = ThreadSignaler
 
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use 
sparingly.
-  // This is copied from org.apache.spark.deploy.SparkSubmitSuite
   protected def runSparkSubmit(
       args: Seq[String],
       sparkHomeOpt: Option[String] = None,
+      timeout: Span = defaultSparkSubmitTimeout,
       isSparkTesting: Boolean = true): Unit = {
     val sparkHome = sparkHomeOpt.getOrElse(
       sys.props.getOrElse("spark.test.home", fail("spark.test.home is not 
set!")))
     val history = ArrayBuffer.empty[String]
     val sparkSubmit = if (Utils.isWindows) {
-      // On Windows, `ProcessBuilder.directory` does not change the current 
working directory.
-      new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
+      new File(new File(sparkHome, "bin"), "spark-submit.cmd")
     } else {
-      "./bin/spark-submit"
+      new File(new File(sparkHome, "bin"), "spark-submit")
     }
-    val commands = Seq(sparkSubmit) ++ args
+    val commands = Seq(sparkSubmit.getCanonicalPath) ++ args
     val commandLine = commands.mkString("'", "' '", "'")
 
     val builder = new ProcessBuilder(commands: _*).directory(new 
File(sparkHome))
@@ -85,7 +87,7 @@ trait SparkSubmitTestUtils extends SparkFunSuite with 
TimeLimits {
     new ProcessOutputCapturer(process.getErrorStream, 
captureOutput("stderr")).start()
 
     try {
-      val exitCode = failAfter(300.seconds) { process.waitFor() }
+      val exitCode = failAfter(timeout) { process.waitFor() }
       if (exitCode != 0) {
         // include logs in output. Note that logging is async and may not have 
completed
         // at the time this exception is raised
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala
index 972a832..fd28a1d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen
 import org.scalatest.{Assertions, BeforeAndAfterEach}
 import org.scalatest.matchers.must.Matchers
 
-import org.apache.spark.{SparkFunSuite, TestUtils}
-import org.apache.spark.deploy.SparkSubmitSuite
+import org.apache.spark.TestUtils
+import org.apache.spark.deploy.SparkSubmitTestUtils
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.ResetSystemProperties
@@ -29,7 +29,7 @@ import org.apache.spark.util.ResetSystemProperties
 // A test for growing the buffer holder to nearly 2GB. Due to the heap size 
limitation of the Spark
 // unit tests JVM, the actually test code is running as a submit job.
 class BufferHolderSparkSubmitSuite
-  extends SparkFunSuite
+  extends SparkSubmitTestUtils
     with Matchers
     with BeforeAndAfterEach
     with ResetSystemProperties {
@@ -46,7 +46,7 @@ class BufferHolderSparkSubmitSuite
       "--conf", "spark.master.rest.enabled=false",
       "--conf", "spark.driver.extraJavaOptions=-ea",
       unusedJar.toString)
-    SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..")
+    runSparkSubmit(argsForSparkSubmit)
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
index ee5e0e0..2f626f7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
@@ -21,8 +21,8 @@ import org.scalatest.{Assertions, BeforeAndAfterEach}
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkFunSuite, TestUtils}
-import org.apache.spark.deploy.SparkSubmitSuite
+import org.apache.spark.TestUtils
+import org.apache.spark.deploy.SparkSubmitTestUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{QueryTest, Row, SparkSession}
 import org.apache.spark.sql.functions.{array, col, count, lit}
@@ -31,7 +31,7 @@ import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.ResetSystemProperties
 
 // Due to the need to set driver's extraJavaOptions, this test needs to use 
actual SparkSubmit.
-class WholeStageCodegenSparkSubmitSuite extends SparkFunSuite
+class WholeStageCodegenSparkSubmitSuite extends SparkSubmitTestUtils
   with Matchers
   with BeforeAndAfterEach
   with ResetSystemProperties {
@@ -51,7 +51,7 @@ class WholeStageCodegenSparkSubmitSuite extends SparkFunSuite
       "--conf", "spark.executor.extraJavaOptions=-XX:+UseCompressedOops",
       "--conf", "spark.sql.adaptive.enabled=false",
       unusedJar.toString)
-    SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..", 3.minutes)
+    runSparkSubmit(argsForSparkSubmit, timeout = 3.minutes)
   }
 }
 
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 7067b65..2ef2700 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -29,12 +29,12 @@ import scala.concurrent.duration._
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.hive.HiveUtils._
 import org.apache.spark.sql.hive.test.HiveTestJars
 import org.apache.spark.sql.internal.StaticSQLConf
-import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 9798189..8e939a5 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -44,12 +44,12 @@ import org.scalatest.BeforeAndAfterAll
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.test.HiveTestJars
 import org.apache.spark.sql.internal.SQLConf
 import 
org.apache.spark.sql.internal.StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION
-import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}
 
 object TestData {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 250c460..7eba0d3 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -26,8 +26,11 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 import org.apache.hadoop.conf.Configuration
+import org.scalatest.time.Span
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, TestUtils}
+import org.apache.spark.deploy.SparkSubmitTestUtils
 import org.apache.spark.internal.config.MASTER_REST_SERVER_ENABLED
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.{QueryTest, Row, SparkSession}
@@ -52,6 +55,7 @@ import org.apache.spark.util.Utils
 @ExtendedHiveTest
 class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
   import HiveExternalCatalogVersionsSuite._
+  override protected val defaultSparkSubmitTimeout: Span = 5.minutes
   private val wareHousePath = Utils.createTempDir(namePrefix = "warehouse")
   private val tmpDataDir = Utils.createTempDir(namePrefix = "test-data")
   // For local test, you can set `spark.test.cache-dir` to a static value like 
`/tmp/test-spark`, to
@@ -216,7 +220,7 @@ class HiveExternalCatalogVersionsSuite extends 
SparkSubmitTestUtils {
         "--conf", s"spark.sql.test.version.index=$index",
         "--driver-java-options", 
s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
         tempPyFile.getCanonicalPath)
-      runSparkSubmit(args, Some(sparkHome.getCanonicalPath), false)
+      runSparkSubmit(args, Some(sparkHome.getCanonicalPath), isSparkTesting = 
false)
     }
 
     tempPyFile.delete()
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index d56d7f3..90752e7 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -27,8 +27,11 @@ import org.apache.hadoop.hive.common.FileUtils
 import org.scalatest.Assertions._
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.matchers.must.Matchers
+import org.scalatest.time.Span
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
+import org.apache.spark.deploy.SparkSubmitTestUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.{QueryTest, Row, SparkSession}
@@ -54,6 +57,8 @@ class HiveSparkSubmitSuite
   with BeforeAndAfterEach
   with ResetSystemProperties {
 
+  override protected val defaultSparkSubmitTimeout: Span = 5.minutes
+
   override protected val enableAutoThreadAudit = false
 
   override def beforeEach(): Unit = {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to