This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 96b591abdba5 [SPARK-45318][SHELL][TESTS] Merge test cases from 
`SingletonRepl2Suite/Repl2Suite` back into `SingletonReplSuite/ReplSuite`
96b591abdba5 is described below

commit 96b591abdba58be1e3cb38c8c19885f6ceb17fd1
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Mon Sep 25 20:02:10 2023 -0700

    [SPARK-45318][SHELL][TESTS] Merge test cases from 
`SingletonRepl2Suite/Repl2Suite` back into `SingletonReplSuite/ReplSuite`
    
    ### What changes were proposed in this pull request?
    This pr aims to merge test cases from `SingletonRepl2Suite/Repl2Suite` back 
into `SingletonReplSuite/ReplSuite` to reduce duplicate code.
    
    ### Why are the changes needed?
    https://github.com/apache/spark/pull/28545 split the relevant test cases 
from `SingletonReplSuite/ReplSuite` into `SingletonRepl2Suite/Repl2Suite`, 
distinguishing different test versions of Scala 2.12 and Scala 2.13.
    
    Currently, Spark 4.0 no longer supports Scala 2.12, so they can be merged 
back into the original files to reduce duplicate code.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43104 from LuciferYang/SPARK-45318.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/repl/Repl2Suite.scala   |  46 ------
 .../scala/org/apache/spark/repl/ReplSuite.scala    |  24 ++-
 .../apache/spark/repl/SingletonRepl2Suite.scala    | 171 ---------------------
 .../org/apache/spark/repl/SingletonReplSuite.scala |  65 ++++++++
 4 files changed, 88 insertions(+), 218 deletions(-)

diff --git a/repl/src/test/scala/org/apache/spark/repl/Repl2Suite.scala 
b/repl/src/test/scala/org/apache/spark/repl/Repl2Suite.scala
deleted file mode 100644
index d55ac91e466f..000000000000
--- a/repl/src/test/scala/org/apache/spark/repl/Repl2Suite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.repl
-
-import java.io._
-
-import org.apache.spark.{SparkContext, SparkFunSuite}
-
-class Repl2Suite extends SparkFunSuite {
-  test("propagation of local properties") {
-    // A mock ILoop that doesn't install the SIGINT handler.
-    class ILoop(out: PrintWriter) extends SparkILoop(null, out)
-
-    val out = new StringWriter()
-    Main.interp = new ILoop(new PrintWriter(out))
-    Main.sparkContext = new SparkContext("local", "repl-test")
-    val settings = new scala.tools.nsc.Settings
-    settings.usejavacp.value = true
-    Main.interp.createInterpreter(settings)
-
-    Main.sparkContext.setLocalProperty("someKey", "someValue")
-
-    // Make sure the value we set in the caller to interpret is propagated in 
the thread that
-    // interprets the command.
-    
Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")")
-    assert(out.toString.contains("someValue"))
-
-    Main.sparkContext.stop()
-    System.clearProperty("spark.driver.port")
-  }
-}
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index bb2a85cfa0de..b9f44a707465 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -23,7 +23,7 @@ import java.nio.file.Files
 import org.apache.logging.log4j.{Level, LogManager}
 import org.apache.logging.log4j.core.{Logger, LoggerContext}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkContext, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
@@ -398,4 +398,26 @@ class ReplSuite extends SparkFunSuite {
     assertContains(infoLogMessage2, out)
     assertContains(debugLogMessage1, out)
   }
+
+  test("propagation of local properties") {
+    // A mock ILoop that doesn't install the SIGINT handler.
+    class ILoop(out: PrintWriter) extends SparkILoop(null, out)
+
+    val out = new StringWriter()
+    Main.interp = new ILoop(new PrintWriter(out))
+    Main.sparkContext = new SparkContext("local", "repl-test")
+    val settings = new scala.tools.nsc.Settings
+    settings.usejavacp.value = true
+    Main.interp.createInterpreter(settings)
+
+    Main.sparkContext.setLocalProperty("someKey", "someValue")
+
+    // Make sure the value we set in the caller to interpret is propagated in 
the thread that
+    // interprets the command.
+    
Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")")
+    assert(out.toString.contains("someValue"))
+
+    Main.sparkContext.stop()
+    System.clearProperty("spark.driver.port")
+  }
 }
diff --git 
a/repl/src/test/scala/org/apache/spark/repl/SingletonRepl2Suite.scala 
b/repl/src/test/scala/org/apache/spark/repl/SingletonRepl2Suite.scala
deleted file mode 100644
index b153a0261aaf..000000000000
--- a/repl/src/test/scala/org/apache/spark/repl/SingletonRepl2Suite.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.repl
-
-import java.io._
-
-import org.apache.spark.SparkFunSuite
-
-/**
- * A special test suite for REPL that all test cases share one REPL instance.
- */
-class SingletonRepl2Suite extends SparkFunSuite {
-
-  private val out = new StringWriter()
-  private val in = new PipedOutputStream()
-  private var thread: Thread = _
-
-  private val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath"
-  private val oldExecutorClasspath = 
System.getProperty(CONF_EXECUTOR_CLASSPATH)
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-
-    val classpath = System.getProperty("java.class.path")
-    System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
-
-    Main.conf.set("spark.master", "local-cluster[2,1,1024]")
-    val interp = new SparkILoop(
-      new BufferedReader(new InputStreamReader(new PipedInputStream(in))),
-      new PrintWriter(out))
-
-    // Forces to create new SparkContext
-    Main.sparkContext = null
-    Main.sparkSession = null
-
-    // Starts a new thread to run the REPL interpreter, so that we won't block.
-    thread = new Thread(() => Main.doMain(Array("-classpath", classpath), 
interp))
-    thread.setDaemon(true)
-    thread.start()
-
-    waitUntil(() => out.toString.contains("Type :help for more information"))
-  }
-
-  override def afterAll(): Unit = {
-    in.close()
-    thread.join()
-    if (oldExecutorClasspath != null) {
-      System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
-    } else {
-      System.clearProperty(CONF_EXECUTOR_CLASSPATH)
-    }
-    super.afterAll()
-  }
-
-  private def waitUntil(cond: () => Boolean): Unit = {
-    import scala.concurrent.duration._
-    import org.scalatest.concurrent.Eventually._
-
-    eventually(timeout(50.seconds), interval(500.millis)) {
-      assert(cond(), "current output: " + out.toString)
-    }
-  }
-
-  /**
-   * Run the given commands string in a globally shared interpreter instance. 
Note that the given
-   * commands should not crash the interpreter, to not affect other test cases.
-   */
-  def runInterpreter(input: String): String = {
-    val currentOffset = out.getBuffer.length()
-    // append a special statement to the end of the given code, so that we can 
know what's
-    // the final output of this code snippet and rely on it to wait until the 
output is ready.
-    val timestamp = System.currentTimeMillis()
-    in.write((input + s"\nval _result_$timestamp = 1\n").getBytes)
-    in.flush()
-    val stopMessage = s"_result_$timestamp: Int = 1"
-    waitUntil(() => 
out.getBuffer.substring(currentOffset).contains(stopMessage))
-    out.getBuffer.substring(currentOffset)
-  }
-
-  def assertContains(message: String, output: String): Unit = {
-    val isContain = output.contains(message)
-    assert(isContain,
-      "Interpreter output did not contain '" + message + "':\n" + output)
-  }
-
-  def assertDoesNotContain(message: String, output: String): Unit = {
-    val isContain = output.contains(message)
-    assert(!isContain,
-      "Interpreter output contained '" + message + "':\n" + output)
-  }
-
-  test("SPARK-31399: should clone+clean line object w/ non-serializable state 
in ClosureCleaner") {
-    // Test ClosureCleaner when a closure captures the enclosing `this` REPL 
line object, and that
-    // object contains an unused non-serializable field.
-    // Specifically, the closure in this test case contains a directly nested 
closure, and the
-    // capture is triggered by the inner closure.
-    // `ns` should be nulled out, but `topLevelValue` should stay intact.
-
-    // Can't use :paste mode because PipedOutputStream/PipedInputStream 
doesn't work well with the
-    // EOT control character (i.e. Ctrl+D).
-    // Just write things on a single line to emulate :paste mode.
-
-    // NOTE: in order for this test case to trigger the intended scenario, the 
following three
-    //       variables need to be in the same "input", which will make the 
REPL pack them into the
-    //       same REPL line object:
-    //         - ns: a non-serializable state, not accessed by the closure;
-    //         - topLevelValue: a serializable state, accessed by the closure;
-    //         - closure: the starting closure, captures the enclosing REPL 
line object.
-    val output = runInterpreter(
-      """
-        |class NotSerializableClass(val x: Int)
-        |val ns = new NotSerializableClass(42); val topLevelValue = 
"someValue"; val closure =
-        |(j: Int) => {
-        |  (1 to j).flatMap { x =>
-        |    (1 to x).map { y => y + topLevelValue }
-        |  }
-        |}
-        |val r = sc.parallelize(0 to 2).map(closure).collect
-      """.stripMargin)
-//    assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] 
= " +
-//      "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 
2someValue))", output)
-    assertContains("r: Array[IndexedSeq[String]] = " +
-      "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 
2someValue))", output)
-    assertDoesNotContain("Exception", output)
-  }
-
-  test("SPARK-31399: ClosureCleaner should discover indirectly nested closure 
in inner class") {
-    // Similar to the previous test case, but with indirect closure nesting 
instead.
-    // There's still nested closures involved, but the inner closure is 
indirectly nested in the
-    // outer closure, with a level of inner class in between them.
-    // This changes how the inner closure references/captures the outer 
closure/enclosing `this`
-    // REPL line object, and covers a different code path in inner closure 
discovery.
-
-    // `ns` should be nulled out, but `topLevelValue` should stay intact.
-
-    val output = runInterpreter(
-      """
-        |class NotSerializableClass(val x: Int)
-        |val ns = new NotSerializableClass(42); val topLevelValue = 
"someValue"; val closure =
-        |(j: Int) => {
-        |  class InnerFoo {
-        |    val innerClosure = (x: Int) => (1 to x).map { y => y + 
topLevelValue }
-        |  }
-        |  val innerFoo = new InnerFoo
-        |  (1 to j).flatMap(innerFoo.innerClosure)
-        |}
-        |val r = sc.parallelize(0 to 2).map(closure).collect
-      """.stripMargin)
-//    assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] 
= " +
-//       "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 
2someValue))", output)
-    assertContains("r: Array[IndexedSeq[String]] = " +
-       "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 
2someValue))", output)
-    assertDoesNotContain("Array(Vector(), Vector(1null), Vector(1null, 1null, 
2null)", output)
-    assertDoesNotContain("Exception", output)
-  }
-}
diff --git a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala 
b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
index 0e3bfcfa89dd..f520e5566405 100644
--- a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
@@ -412,4 +412,69 @@ class SingletonReplSuite extends SparkFunSuite {
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
   }
+
+  test("SPARK-31399: should clone+clean line object w/ non-serializable state 
in ClosureCleaner") {
+    // Test ClosureCleaner when a closure captures the enclosing `this` REPL 
line object, and that
+    // object contains an unused non-serializable field.
+    // Specifically, the closure in this test case contains a directly nested 
closure, and the
+    // capture is triggered by the inner closure.
+    // `ns` should be nulled out, but `topLevelValue` should stay intact.
+
+    // Can't use :paste mode because PipedOutputStream/PipedInputStream 
doesn't work well with the
+    // EOT control character (i.e. Ctrl+D).
+    // Just write things on a single line to emulate :paste mode.
+
+    // NOTE: in order for this test case to trigger the intended scenario, the 
following three
+    //       variables need to be in the same "input", which will make the 
REPL pack them into the
+    //       same REPL line object:
+    //         - ns: a non-serializable state, not accessed by the closure;
+    //         - topLevelValue: a serializable state, accessed by the closure;
+    //         - closure: the starting closure, captures the enclosing REPL 
line object.
+    val output = runInterpreter(
+      """
+        |class NotSerializableClass(val x: Int)
+        |val ns = new NotSerializableClass(42); val topLevelValue = 
"someValue"; val closure =
+        |(j: Int) => {
+        |  (1 to j).flatMap { x =>
+        |    (1 to x).map { y => y + topLevelValue }
+        |  }
+        |}
+        |val r = sc.parallelize(0 to 2).map(closure).collect
+      """.stripMargin)
+    //  assertContains("r: 
Array[scala.collection.immutable.IndexedSeq[String]] = " +
+    //    "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 
2someValue))", output)
+    assertContains("r: Array[IndexedSeq[String]] = " +
+      "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 
2someValue))", output)
+    assertDoesNotContain("Exception", output)
+  }
+
+  test("SPARK-31399: ClosureCleaner should discover indirectly nested closure 
in inner class") {
+    // Similar to the previous test case, but with indirect closure nesting 
instead.
+    // There's still nested closures involved, but the inner closure is 
indirectly nested in the
+    // outer closure, with a level of inner class in between them.
+    // This changes how the inner closure references/captures the outer 
closure/enclosing `this`
+    // REPL line object, and covers a different code path in inner closure 
discovery.
+
+    // `ns` should be nulled out, but `topLevelValue` should stay intact.
+
+    val output = runInterpreter(
+      """
+        |class NotSerializableClass(val x: Int)
+        |val ns = new NotSerializableClass(42); val topLevelValue = 
"someValue"; val closure =
+        |(j: Int) => {
+        |  class InnerFoo {
+        |    val innerClosure = (x: Int) => (1 to x).map { y => y + 
topLevelValue }
+        |  }
+        |  val innerFoo = new InnerFoo
+        |  (1 to j).flatMap(innerFoo.innerClosure)
+        |}
+        |val r = sc.parallelize(0 to 2).map(closure).collect
+      """.stripMargin)
+    //  assertContains("r: 
Array[scala.collection.immutable.IndexedSeq[String]] = " +
+    //    "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 
2someValue))", output)
+    assertContains("r: Array[IndexedSeq[String]] = " +
+      "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 
2someValue))", output)
+    assertDoesNotContain("Array(Vector(), Vector(1null), Vector(1null, 1null, 
2null)", output)
+    assertDoesNotContain("Exception", output)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to