LuciferYang commented on code in PR #39541:
URL: https://github.com/apache/spark/pull/39541#discussion_r1082065687


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client.util
+
+import java.io.{BufferedOutputStream, File}
+
+import scala.io.Source
+
+import org.scalatest.BeforeAndAfterAll
+import sys.process._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connect.client.SparkConnectClient
+import org.apache.spark.sql.connect.common.config.ConnectCommon
+
+/**
+ * An util class to start a local spark connect server in a different process 
for local E2E tests.
+ * It is designed to start the server once but shared by all tests. It is 
equivalent to use the
+ * following command to start the connect server via command line:
+ *
+ * {{{
+ * bin/spark-shell \
+ * --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar | 
paste -sd ',' -` \
+ * --conf spark.plugins=org.apache.spark.sql.connect.SparkConnectPlugin
+ * }}}
+ *
+ * Set system property `SPARK_HOME` if the test is not executed from the Spark 
project top folder.
+ * Set system property `DEBUG_SC_JVM_CLIENT=true` to print the server process 
output in the
+ * console to debug server start stop problems.
+ */
+object SparkConnectServerUtils {
+  // System properties used for testing and debugging
+  private val SPARK_HOME = "SPARK_HOME"
+  private val ENV_DEBUG_SC_JVM_CLIENT = "DEBUG_SC_JVM_CLIENT"
+
+  private val sparkHome = System.getProperty(SPARK_HOME, fileSparkHome())
+  private val isDebug = System.getProperty(ENV_DEBUG_SC_JVM_CLIENT, 
"false").toBoolean
+
+  // Log server start stop debug info into console
+  // scalastyle:off println
+  private[connect] def debug(msg: String): Unit = if (isDebug) println(msg)
+  // scalastyle:on println
+  private[connect] def debug(error: Throwable): Unit = if (isDebug) 
error.printStackTrace()
+
+  // Server port
+  private[connect] val port = ConnectCommon.CONNECT_GRPC_BINDING_PORT + 
util.Random.nextInt(1000)
+
+  @volatile private var stopped = false
+
+  private lazy val sparkConnect: Process = {
+    debug("Starting the Spark Connect Server...")
+    val jar = findSparkConnectJar
+    val builder = Process(
+      new File(sparkHome, "bin/spark-shell").getCanonicalPath,
+      Seq(
+        "--jars",
+        jar,
+        "--driver-class-path",
+        jar,
+        "--conf",
+        "spark.plugins=org.apache.spark.sql.connect.SparkConnectPlugin",
+        "--conf",
+        s"spark.connect.grpc.binding.port=$port"))
+
+    val io = new ProcessIO(
+      // Hold the input channel to the spark console to keep the console open
+      in => new BufferedOutputStream(in),
+      // Only redirect output if debug to avoid channel interruption error on 
process termination.
+      out => if (isDebug) Source.fromInputStream(out).getLines.foreach(debug),
+      err => if (isDebug) Source.fromInputStream(err).getLines.foreach(debug))
+    val process = builder.run(io)
+
+    // Adding JVM shutdown hook
+    sys.addShutdownHook(kill())
+    process
+  }
+
+  def start(): Unit = {
+    assert(!stopped)
+    sparkConnect
+  }
+
+  def kill(): Int = {
+    stopped = true
+    debug("Stopping the Spark Connect Server...")
+    sparkConnect.destroy()
+    val code = sparkConnect.exitValue()
+    debug(s"Spark Connect Server is stopped with exit code: $code")
+    code
+  }
+
+  private def fileSparkHome(): String = {
+    val path = new File("./").getCanonicalPath
+    if (path.endsWith("connector/connect/client/jvm")) {
+      // the current folder is the client project folder
+      new File("../../../../").getCanonicalPath
+    } else {
+      path
+    }
+  }
+
+  private def findSparkConnectJar: String = {
+    val target = "connector/connect/server/target"
+    val parentDir = new File(sparkHome, target)
+    assert(
+      parentDir.exists(),
+      s"Fail to locate the spark connect target folder: 
'${parentDir.getCanonicalPath}'. " +
+        s"SPARK_HOME='${new File(sparkHome).getCanonicalPath}'. " +
+        "Make sure system property `SPARK_HOME` is set correctly.")
+    val jars = recursiveListFiles(parentDir).filter { f =>
+      // SBT jar
+      (f.getParent.endsWith("scala-2.12") &&

Review Comment:
   +1 for Scala 2.13



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to