This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8c635a0fa558 [SPARK-44259][CONNECT][TESTS] Make `connect-client-jvm` pass on Java 21 except `RemoteSparkSession`-based tests 8c635a0fa558 is described below commit 8c635a0fa5584b35d6dd2e5fb774a2a8de7201a2 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Fri Jun 30 17:30:20 2023 -0700 [SPARK-44259][CONNECT][TESTS] Make `connect-client-jvm` pass on Java 21 except `RemoteSparkSession`-based tests ### What changes were proposed in this pull request? This pr ignore all tests inherit `RemoteSparkSession` as default for Java 21 by override the `test` function in `RemoteSparkSession`, they are all arrow-based tests due to the use of arrow data format for rpc communication in connect. ``` 23/06/30 11:45:41 ERROR SparkConnectService: Error during: execute. UserId: . SessionId: e7479b73-d02c-47e9-85c8-40b3e9315561. java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174) at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229) at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224) at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133) at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303) at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276) at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:237) at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.$anonfun$next$3(ArrowConverters.scala:174) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1487) at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.next(ArrowConverters.scala:181) at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.next(ArrowConverters.scala:128) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at org.apache.spark.sql.connect.service.SparkConnectStreamHandler$.processAsArrowBatches(SparkConnectStreamHandler.scala:178) at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:104) at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1(SparkConnectStreamHandler.scala:86) at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1$adapted(SparkConnectStreamHandler.scala:53) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$3(SessionHolder.scala:152) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:857) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:152) at org.apache.spark.JobArtifactSet$.withActive(JobArtifactSet.scala:109) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContext$1(SessionHolder.scala:122) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:209) at org.apache.spark.sql.connect.service.SessionHolder.withContext(SessionHolder.scala:121) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:151) at org.apache.spark.sql.connect.service.SessionHolder.withSessionBasedPythonPaths(SessionHolder.scala:137) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:150) at org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:53) at org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:166) at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:584) at org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) at org.sparkproject.connect.grpc.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:346) at org.sparkproject.connect.grpc.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:860) at org.sparkproject.connect.grpc.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.sparkproject.connect.grpc.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) ``` All ignored test related to https://github.com/apache/arrow/issues/35053, so we should wait for upgrading to a new arrow version and re-enable them for Java 21, the following TODO JIRA is created for that. - Reenable Arrow-based connect tests in Java 21: https://issues.apache.org/jira/browse/SPARK-44121 ### Why are the changes needed? Make Java 21 daily test can monitor other non-arrow based tests. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - manually tests with Java 21: ``` java -version openjdk version "21-ea" 2023-09-19 OpenJDK Runtime Environment Zulu21+65-CA (build 21-ea+26) OpenJDK 64-Bit Server VM Zulu21+65-CA (build 21-ea+26, mixed mode, sharing) ``` ``` build/sbt "connect-client-jvm/test" -Phive ``` ``` [info] Run completed in 4 seconds, 640 milliseconds. [info] Total number of tests run: 846 [info] Suites: completed 22, aborted 0 [info] Tests: succeeded 846, failed 0, canceled 167, ignored 1, pending 0 [info] All tests passed. ``` Closes #41805 from LuciferYang/SPARK-44259. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../connect/client/util/RemoteSparkSession.scala | 86 +++++++++++++--------- 1 file changed, 52 insertions(+), 34 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala index e05828606d09..8d84dffc9d5b 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala @@ -21,7 +21,9 @@ import java.util.concurrent.TimeUnit import scala.io.Source -import org.scalatest.BeforeAndAfterAll +import org.apache.commons.lang3.{JavaVersion, SystemUtils} +import org.scalactic.source.Position +import org.scalatest.{BeforeAndAfterAll, Tag} import sys.process._ import org.apache.spark.sql.SparkSession @@ -170,41 +172,44 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll { protected lazy val serverPort: Int = port override def beforeAll(): Unit = { - super.beforeAll() - SparkConnectServerUtils.start() - spark = SparkSession - .builder() - .client(SparkConnectClient.builder().port(serverPort).build()) - .create() - - // Retry and wait for the server to start - val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min - var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff - var success = false - val error = new RuntimeException(s"Failed to start the test server on port $serverPort.") - - while (!success && System.nanoTime() < stop) { - try { - // Run a simple query to verify the server is really up and ready - val result = spark - .sql("select val from (values ('Hello'), ('World')) as t(val)") - .collect() - assert(result.length == 2) - success = true - debug("Spark Connect Server is up.") - } catch { - // ignored the error - case e: Throwable => - error.addSuppressed(e) - Thread.sleep(sleepInternalMs) - sleepInternalMs *= 2 + // TODO(SPARK-44121) Remove this check condition + if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) { + super.beforeAll() + SparkConnectServerUtils.start() + spark = SparkSession + .builder() + .client(SparkConnectClient.builder().port(serverPort).build()) + .create() + + // Retry and wait for the server to start + val stop = System.nanoTime() + TimeUnit.MINUTES.toNanos(1) // ~1 min + var sleepInternalMs = TimeUnit.SECONDS.toMillis(1) // 1s with * 2 backoff + var success = false + val error = new RuntimeException(s"Failed to start the test server on port $serverPort.") + + while (!success && System.nanoTime() < stop) { + try { + // Run a simple query to verify the server is really up and ready + val result = spark + .sql("select val from (values ('Hello'), ('World')) as t(val)") + .collect() + assert(result.length == 2) + success = true + debug("Spark Connect Server is up.") + } catch { + // ignored the error + case e: Throwable => + error.addSuppressed(e) + Thread.sleep(sleepInternalMs) + sleepInternalMs *= 2 + } } - } - // Throw error if failed - if (!success) { - debug(error) - throw error + // Throw error if failed + if (!success) { + debug(error) + throw error + } } } @@ -217,4 +222,17 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll { spark = null super.afterAll() } + + /** + * SPARK-44259: override test function to skip `RemoteSparkSession-based` tests as default, we + * should delete this function after SPARK-44121 is completed. + */ + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + // TODO(SPARK-44121) Re-enable Arrow-based connect tests in Java 21 + assume(SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) + testFun + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org