This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 15bb95564b6 [SPARK-44806][CONNECT] Move internal client spark-connect-common to be able to test real in-process server with a real RPC client 15bb95564b6 is described below commit 15bb95564b6f2a3e76996062155ea029438ab4bb Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Thu Aug 17 17:05:58 2023 +0200 [SPARK-44806][CONNECT] Move internal client spark-connect-common to be able to test real in-process server with a real RPC client ### What changes were proposed in this pull request? Currently, Spark Connect has the following types of integration tests: * Client unit tests with a mock in process server (DummySparkConnectService with GRPC InProcessServerBuilder) * Client unit tests with a mock RPC server (DummySparkConnectService with GRPC NettyServerBuilder) * Server unit tests with in-process server and using various mocks * E2E tests of real client with a server started in another process (using RemoteSparkSession) What is lacking are E2E tests with an in-process Server (so that server state can be inspected asserted), and a real RPC client. This is impossible, because classes from `spark-connect-client-jvm` module include the client API which duplicates Spark SQL APIs of SparkSession, Dataset etc. When trying to pull a real client into the server module for testing, these classes clash. Move the `org.apache.spark.sql.connect.client` code into `spark-connect-common` module, so that the internal SparkConnectClient code is separated from the client public API, and can be pulled into testing of the server. The only class that we keep in `spark-connect-client-jvm` is `AmmoniteClassFinder`, to avoid pulling in ammonite dependency into common. Tried alternative approach in https://github.com/apache/spark/pull/42465. That doesn't work, because it also reorders the maven build in a way so that client is build before server, but client actually requires server to be build first to use tests with `RemoteSparkSession` Tried alternative approach to depend on a shaded/relocated version of client in https://github.com/apache/spark/pull/42461, but that's just not possible to do neither in maven nor sbt. Tried alternative approach to create client-jvm-internal module in https://github.com/apache/spark/pull/42441, moving things to connect-common was preferred to introducing new module by reviewers. Moved it together with tests in https://github.com/apache/spark/pull/42501, but moving tests isn't really needed. ### Why are the changes needed? For being able to use the internal client for testing of in-process server with an in-process client, but communicating over real RPC. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? All modules test and build. Closes #42523 from juliuszsompolski/sc-client-common-mainonly. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 72a466835a4490257eec0c9af2bbc9291c46de1e) Signed-off-by: Herman van Hovell <her...@databricks.com> --- ...ClassFinder.scala => AmmoniteClassFinder.scala} | 41 ++-------------------- connector/connect/common/pom.xml | 12 +++++++ .../client/arrow/ScalaCollectionUtils.scala | 0 .../client/arrow/ScalaCollectionUtils.scala | 0 .../spark/sql/connect/client/ArtifactManager.scala | 0 .../spark/sql/connect/client/ClassFinder.scala | 27 +------------- .../sql/connect/client/CloseableIterator.scala | 0 .../client/CustomSparkConnectBlockingStub.scala | 0 .../connect/client/CustomSparkConnectStub.scala | 0 .../ExecutePlanResponseReattachableIterator.scala | 0 .../connect/client/GrpcExceptionConverter.scala | 2 +- .../sql/connect/client/GrpcRetryHandler.scala | 0 .../sql/connect/client/SparkConnectClient.scala | 0 .../connect/client/SparkConnectClientParser.scala | 0 .../spark/sql/connect/client/SparkResult.scala | 0 .../connect/client/arrow/ArrowDeserializer.scala | 0 .../connect/client/arrow/ArrowEncoderUtils.scala | 0 .../sql/connect/client/arrow/ArrowSerializer.scala | 0 .../connect/client/arrow/ArrowVectorReader.scala | 0 .../arrow/ConcatenatingArrowStreamReader.scala | 0 .../apache/spark/sql/connect/client/package.scala | 0 .../spark/sql/connect/client/util/Cleaner.scala | 0 22 files changed, 17 insertions(+), 65 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ClassFinder.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/AmmoniteClassFinder.scala similarity index 58% copy from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ClassFinder.scala copy to connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/AmmoniteClassFinder.scala index 0371d42f2d6..4ebc22202b0 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ClassFinder.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/AmmoniteClassFinder.scala @@ -14,52 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.sql.connect.client import java.net.URL -import java.nio.file.{Files, LinkOption, Path, Paths} - -import scala.collection.JavaConverters._ +import java.nio.file.Paths import ammonite.repl.api.Session import ammonite.runtime.SpecialClassLoader -import org.apache.spark.sql.connect.client.Artifact.{InMemory, LocalFile} - -trait ClassFinder { - def findClasses(): Iterator[Artifact] -} - -/** - * A generic [[ClassFinder]] implementation that traverses a specific REPL output directory. - * @param _rootDir - */ -class REPLClassDirMonitor(_rootDir: String) extends ClassFinder { - private val rootDir = Paths.get(_rootDir) - require(rootDir.isAbsolute) - require(Files.isDirectory(rootDir)) - - override def findClasses(): Iterator[Artifact] = { - Files - .walk(rootDir) - // Ignore symbolic links - .filter(path => Files.isRegularFile(path, LinkOption.NOFOLLOW_LINKS) && isClass(path)) - .map[Artifact](path => toArtifact(path)) - .iterator() - .asScala - } - - private def toArtifact(path: Path): Artifact = { - // Persist the relative path of the classfile - Artifact.newClassArtifact(rootDir.relativize(path), new LocalFile(path)) - } - - private def isClass(path: Path): Boolean = path.toString.endsWith(".class") -} - /** * A special [[ClassFinder]] for the Ammonite REPL to handle in-memory class files. + * * @param session */ class AmmoniteClassFinder(session: Session) extends ClassFinder { @@ -73,7 +38,7 @@ class AmmoniteClassFinder(session: Session) extends ClassFinder { parts(parts.length - 1) += ".class" val path = Paths.get(parts.head, parts.tail: _*) val bytes = classloader.newFileDict(name) - Artifact.newClassArtifact(path, new InMemory(bytes)) + Artifact.newClassArtifact(path, new Artifact.InMemory(bytes)) } } } diff --git a/connector/connect/common/pom.xml b/connector/connect/common/pom.xml index df8972b832d..fb261c284d9 100644 --- a/connector/connect/common/pom.xml +++ b/connector/connect/common/pom.xml @@ -49,6 +49,18 @@ <artifactId>protobuf-java</artifactId> <scope>compile</scope> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${connect.guava.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>failureaccess</artifactId> + <version>${guava.failureaccess.version}</version> + <scope>compile</scope> + </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> diff --git a/connector/connect/client/jvm/src/main/scala-2.12/org/apache/spark/sql/connect/client/arrow/ScalaCollectionUtils.scala b/connector/connect/common/src/main/scala-2.12/org/apache/spark/sql/connect/client/arrow/ScalaCollectionUtils.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala-2.12/org/apache/spark/sql/connect/client/arrow/ScalaCollectionUtils.scala rename to connector/connect/common/src/main/scala-2.12/org/apache/spark/sql/connect/client/arrow/ScalaCollectionUtils.scala diff --git a/connector/connect/client/jvm/src/main/scala-2.13/org/apache/spark/sql/connect/client/arrow/ScalaCollectionUtils.scala b/connector/connect/common/src/main/scala-2.13/org/apache/spark/sql/connect/client/arrow/ScalaCollectionUtils.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala-2.13/org/apache/spark/sql/connect/client/arrow/ScalaCollectionUtils.scala rename to connector/connect/common/src/main/scala-2.13/org/apache/spark/sql/connect/client/arrow/ScalaCollectionUtils.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ClassFinder.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ClassFinder.scala similarity index 66% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ClassFinder.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ClassFinder.scala index 0371d42f2d6..ff6473bfcb1 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ClassFinder.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ClassFinder.scala @@ -17,15 +17,11 @@ package org.apache.spark.sql.connect.client -import java.net.URL import java.nio.file.{Files, LinkOption, Path, Paths} import scala.collection.JavaConverters._ -import ammonite.repl.api.Session -import ammonite.runtime.SpecialClassLoader - -import org.apache.spark.sql.connect.client.Artifact.{InMemory, LocalFile} +import org.apache.spark.sql.connect.client.Artifact.LocalFile trait ClassFinder { def findClasses(): Iterator[Artifact] @@ -57,24 +53,3 @@ class REPLClassDirMonitor(_rootDir: String) extends ClassFinder { private def isClass(path: Path): Boolean = path.toString.endsWith(".class") } - -/** - * A special [[ClassFinder]] for the Ammonite REPL to handle in-memory class files. - * @param session - */ -class AmmoniteClassFinder(session: Session) extends ClassFinder { - - override def findClasses(): Iterator[Artifact] = { - session.frames.iterator.flatMap { frame => - val classloader = frame.classloader.asInstanceOf[SpecialClassLoader] - val signatures: Seq[(Either[String, URL], Long)] = classloader.classpathSignature - signatures.iterator.collect { case (Left(name), _) => - val parts = name.split('.') - parts(parts.length - 1) += ".class" - val path = Paths.get(parts.head, parts.tail: _*) - val bytes = classloader.newFileDict(name) - Artifact.newClassArtifact(path, new InMemory(bytes)) - } - } - } -} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CloseableIterator.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectStub.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectStub.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectStub.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectStub.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala similarity index 98% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala index 64d1e5c488a..9b8181cb88f 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.connect.client -import scala.jdk.CollectionConverters._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import com.google.rpc.ErrorInfo diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClientParser.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClientParser.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClientParser.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClientParser.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderUtils.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowSerializer.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ConcatenatingArrowStreamReader.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ConcatenatingArrowStreamReader.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ConcatenatingArrowStreamReader.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ConcatenatingArrowStreamReader.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/package.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/package.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/package.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/package.scala diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/util/Cleaner.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/util/Cleaner.scala similarity index 100% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/util/Cleaner.scala rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/util/Cleaner.scala --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org