This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c635a0fa558 [SPARK-44259][CONNECT][TESTS] Make `connect-client-jvm` 
pass on Java 21 except `RemoteSparkSession`-based tests
8c635a0fa558 is described below

commit 8c635a0fa5584b35d6dd2e5fb774a2a8de7201a2
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Fri Jun 30 17:30:20 2023 -0700

    [SPARK-44259][CONNECT][TESTS] Make `connect-client-jvm` pass on Java 21 
except `RemoteSparkSession`-based tests
    
    ### What changes were proposed in this pull request?
    This pr ignore all tests inherit `RemoteSparkSession` as default for Java 
21 by override the `test` function in `RemoteSparkSession`,  they are all 
arrow-based tests due to the use of arrow data format for rpc communication in 
connect.
    
    ```
    23/06/30 11:45:41 ERROR SparkConnectService: Error during: execute. UserId: 
. SessionId: e7479b73-d02c-47e9-85c8-40b3e9315561.
    java.lang.UnsupportedOperationException: sun.misc.Unsafe or 
java.nio.DirectByteBuffer.<init>(long, int) not available
            at 
org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
            at 
org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
            at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
            at 
org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
            at 
org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
            at 
org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
            at 
org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:237)
            at 
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.$anonfun$next$3(ArrowConverters.scala:174)
            at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1487)
            at 
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.next(ArrowConverters.scala:181)
            at 
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.next(ArrowConverters.scala:128)
            at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
            at scala.collection.Iterator.foreach(Iterator.scala:943)
            at scala.collection.Iterator.foreach$(Iterator.scala:943)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
            at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler$.processAsArrowBatches(SparkConnectStreamHandler.scala:178)
            at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:104)
            at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1(SparkConnectStreamHandler.scala:86)
            at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1$adapted(SparkConnectStreamHandler.scala:53)
            at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$3(SessionHolder.scala:152)
            at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:857)
            at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:152)
            at 
org.apache.spark.JobArtifactSet$.withActive(JobArtifactSet.scala:109)
            at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContext$1(SessionHolder.scala:122)
            at 
org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:209)
            at 
org.apache.spark.sql.connect.service.SessionHolder.withContext(SessionHolder.scala:121)
            at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:151)
            at 
org.apache.spark.sql.connect.service.SessionHolder.withSessionBasedPythonPaths(SessionHolder.scala:137)
            at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:150)
            at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:53)
            at 
org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:166)
            at 
org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:584)
            at 
org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
            at 
org.sparkproject.connect.grpc.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:346)
            at 
org.sparkproject.connect.grpc.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:860)
            at 
org.sparkproject.connect.grpc.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
            at 
org.sparkproject.connect.grpc.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
            at java.base/java.lang.Thread.run(Thread.java:1583)
    ```
    
    All ignored test related to https://github.com/apache/arrow/issues/35053, 
so we should wait for upgrading to a new arrow version  and re-enable them for 
Java 21,  the following TODO JIRA is created for that.
    
    - Reenable Arrow-based connect tests in Java 21:  
https://issues.apache.org/jira/browse/SPARK-44121
    
    ### Why are the changes needed?
    Make Java 21 daily test can monitor other non-arrow based tests.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - Pass GitHub Actions
    - manually tests with Java 21:
    
    ```
    java -version
    openjdk version "21-ea" 2023-09-19
    OpenJDK Runtime Environment Zulu21+65-CA (build 21-ea+26)
    OpenJDK 64-Bit Server VM Zulu21+65-CA (build 21-ea+26, mixed mode, sharing)
    ```
    
    ```
    build/sbt "connect-client-jvm/test" -Phive
    ```
    
    ```
    [info] Run completed in 4 seconds, 640 milliseconds.
    [info] Total number of tests run: 846
    [info] Suites: completed 22, aborted 0
    [info] Tests: succeeded 846, failed 0, canceled 167, ignored 1, pending 0
    [info] All tests passed.
    ```
    
    Closes #41805 from LuciferYang/SPARK-44259.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../connect/client/util/RemoteSparkSession.scala   | 86 +++++++++++++---------
 1 file changed, 52 insertions(+), 34 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala
index e05828606d09..8d84dffc9d5b 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala
@@ -21,7 +21,9 @@ import java.util.concurrent.TimeUnit
 
 import scala.io.Source
 
-import org.scalatest.BeforeAndAfterAll
+import org.apache.commons.lang3.{JavaVersion, SystemUtils}
+import org.scalactic.source.Position
+import org.scalatest.{BeforeAndAfterAll, Tag}
 import sys.process._
 
 import org.apache.spark.sql.SparkSession
@@ -170,41 +172,44 @@ trait RemoteSparkSession extends ConnectFunSuite with 
BeforeAndAfterAll {
   protected lazy val serverPort: Int = port
 
   override def beforeAll(): Unit = {
-    super.beforeAll()
-    SparkConnectServerUtils.start()
-    spark = SparkSession
-      .builder()
-      .client(SparkConnectClient.builder().port(serverPort).build())
-      .create()
-
-    // Retry and wait for the server to start
-    val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min
-    var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff
-    var success = false
-    val error = new RuntimeException(s"Failed to start the test server on port 
$serverPort.")
-
-    while (!success && System.nanoTime() < stop) {
-      try {
-        // Run a simple query to verify the server is really up and ready
-        val result = spark
-          .sql("select val from (values ('Hello'), ('World')) as t(val)")
-          .collect()
-        assert(result.length == 2)
-        success = true
-        debug("Spark Connect Server is up.")
-      } catch {
-        // ignored the error
-        case e: Throwable =>
-          error.addSuppressed(e)
-          Thread.sleep(sleepInternalMs)
-          sleepInternalMs *= 2
+    // TODO(SPARK-44121) Remove this check condition
+    if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) {
+      super.beforeAll()
+      SparkConnectServerUtils.start()
+      spark = SparkSession
+        .builder()
+        .client(SparkConnectClient.builder().port(serverPort).build())
+        .create()
+
+      // Retry and wait for the server to start
+      val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min
+      var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff
+      var success = false
+      val error = new RuntimeException(s"Failed to start the test server on 
port $serverPort.")
+
+      while (!success && System.nanoTime() < stop) {
+        try {
+          // Run a simple query to verify the server is really up and ready
+          val result = spark
+            .sql("select val from (values ('Hello'), ('World')) as t(val)")
+            .collect()
+          assert(result.length == 2)
+          success = true
+          debug("Spark Connect Server is up.")
+        } catch {
+          // ignored the error
+          case e: Throwable =>
+            error.addSuppressed(e)
+            Thread.sleep(sleepInternalMs)
+            sleepInternalMs *= 2
+        }
       }
-    }
 
-    // Throw error if failed
-    if (!success) {
-      debug(error)
-      throw error
+      // Throw error if failed
+      if (!success) {
+        debug(error)
+        throw error
+      }
     }
   }
 
@@ -217,4 +222,17 @@ trait RemoteSparkSession extends ConnectFunSuite with 
BeforeAndAfterAll {
     spark = null
     super.afterAll()
   }
+
+  /**
+   * SPARK-44259: override test function to skip `RemoteSparkSession-based` 
tests as default, we
+   * should delete this function after SPARK-44121 is completed.
+   */
+  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
+      pos: Position): Unit = {
+    super.test(testName, testTags: _*) {
+      // TODO(SPARK-44121) Re-enable Arrow-based connect tests in Java 21
+      assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17))
+      testFun
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to