This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dcbebce9eacb [SPARK-46202][CONNECT] Expose new ArtifactManager APIs to 
support custom target directories
dcbebce9eacb is described below

commit dcbebce9eacb201cc8dfac918318be04ada842a8
Author: vicennial <venkata.gud...@databricks.com>
AuthorDate: Tue Dec 12 14:10:41 2023 -0800

    [SPARK-46202][CONNECT] Expose new ArtifactManager APIs to support custom 
target directories
    
    ### What changes were proposed in this pull request?
    Adds new client APIs to the Spark Connect Scala Client:
    - `def addArtifact(bytes: Array[Byte], target: String): Unit`
    - `def addArtifact(source: String, target: String): Unit`
    
    ### Why are the changes needed?
    Currently, without the use of a REPL/Class finder, there is no API to 
support adding artifacts (file-based and in-memory)
     with a custom target directory structure to the remote Spark Connect 
session.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    Users can do the following for classfiles and jars:
    ```scala
    addArtifact("/Users/dummyUser/files/foo/bar.class", 
"sub/directory/foo.class")
    addArtifact(bytesBar, "bar.class")
    ```
    
    This would preserve the directory structure in the remote server. In this 
case, the file would be stored under the directory:
    `$ROOT_SESSION_ARTIFACT_DIRECTORY/classes/bar.class`
    `$ROOT_SESSION_ARTIFACT_DIRECTORY/classes/sub/directory/foo.class`
    ### How was this patch tested?
    
    Unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44109 from vicennial/SPARK-46202.
    
    Authored-by: vicennial <venkata.gud...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  |  41 ++++++++
 .../spark/sql/connect/client/ArtifactSuite.scala   |  50 ++++++++++
 .../spark/sql/connect/client/ArtifactManager.scala | 108 +++++++++++++++++----
 .../sql/connect/client/SparkConnectClient.scala    |  37 +++++++
 4 files changed, 219 insertions(+), 17 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index daa172e215ad..81c2ca11a7fb 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -590,6 +590,47 @@ class SparkSession private[sql] (
   @Experimental
   def addArtifact(uri: URI): Unit = client.addArtifact(uri)
 
+  /**
+   * Add a single in-memory artifact to the session while preserving the 
directory structure
+   * specified by `target` under the session's working directory of that 
particular file
+   * extension.
+   *
+   * Supported target file extensions are .jar and .class.
+   *
+   * ==Example==
+   * {{{
+   *  addArtifact(bytesBar, "foo/bar.class")
+   *  addArtifact(bytesFlat, "flat.class")
+   *  // Directory structure of the session's working directory for class 
files would look like:
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+   * }}}
+   *
+   * @since 4.0.0
+   */
+  @Experimental
+  def addArtifact(bytes: Array[Byte], target: String): Unit = 
client.addArtifact(bytes, target)
+
+  /**
+   * Add a single artifact to the session while preserving the directory 
structure specified by
+   * `target` under the session's working directory of that particular file 
extension.
+   *
+   * Supported target file extensions are .jar and .class.
+   *
+   * ==Example==
+   * {{{
+   *  addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class")
+   *  addArtifact("/Users/dummyUser/files/flat.class", "flat.class")
+   *  // Directory structure of the session's working directory for class 
files would look like:
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+   * }}}
+   *
+   * @since 4.0.0
+   */
+  @Experimental
+  def addArtifact(source: String, target: String): Unit = 
client.addArtifact(source, target)
+
   /**
    * Add one or more artifacts to the session.
    *
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
index f945313d2427..0c8ef8e599fb 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
@@ -284,4 +284,54 @@ class ArtifactSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
     }
 
   }
+
+  test("artifact with custom target") {
+    val artifactPath = artifactFilePath.resolve("smallClassFile.class")
+    val target = "sub/package/smallClassFile.class"
+    artifactManager.addArtifact(artifactPath.toString, target)
+    val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+    // Single `AddArtifactRequest`
+    assert(receivedRequests.size == 1)
+
+    val request = receivedRequests.head
+    assert(request.hasBatch)
+
+    val batch = request.getBatch
+    // Single artifact in batch
+    assert(batch.getArtifactsList.size() == 1)
+
+    val singleChunkArtifact = batch.getArtifacts(0)
+    assert(singleChunkArtifact.getName.equals(s"classes/$target"))
+    assertFileDataEquality(singleChunkArtifact.getData, artifactPath)
+  }
+
+  test("in-memory artifact with custom target") {
+    val artifactPath = artifactFilePath.resolve("smallClassFile.class")
+    val artifactBytes = Files.readAllBytes(artifactPath)
+    val target = "sub/package/smallClassFile.class"
+    artifactManager.addArtifact(artifactBytes, target)
+    val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+    // Single `AddArtifactRequest`
+    assert(receivedRequests.size == 1)
+
+    val request = receivedRequests.head
+    assert(request.hasBatch)
+
+    val batch = request.getBatch
+    // Single artifact in batch
+    assert(batch.getArtifactsList.size() == 1)
+
+    val singleChunkArtifact = batch.getArtifacts(0)
+    assert(singleChunkArtifact.getName.equals(s"classes/$target"))
+    assert(singleChunkArtifact.getData.getData == 
ByteString.copyFrom(artifactBytes))
+  }
+
+  test(
+    "When both source and target paths are given, extension conditions are 
checked " +
+      "on target path") {
+    val artifactPath = artifactFilePath.resolve("smallClassFile.class")
+    assertThrows[UnsupportedOperationException] {
+      artifactManager.addArtifact(artifactPath.toString, "dummy.extension")
+    }
+  }
 }
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
index 3cd35803d1ec..36bc60c7d63a 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.connect.client
 
-import java.io.{ByteArrayInputStream, InputStream, PrintStream}
+import java.io.{ByteArrayInputStream, File, InputStream, PrintStream}
 import java.net.URI
 import java.nio.file.{Files, Path, Paths}
 import java.util.Arrays
@@ -83,14 +83,10 @@ class ArtifactManager(
     uri.getScheme match {
       case "file" =>
         val path = Paths.get(uri)
-        val artifact = path.getFileName.toString match {
-          case jar if jar.endsWith(".jar") =>
-            newJarArtifact(path.getFileName, new LocalFile(path))
-          case cf if cf.endsWith(".class") =>
-            newClassArtifact(path.getFileName, new LocalFile(path))
-          case other =>
-            throw new UnsupportedOperationException(s"Unsupported file format: 
$other")
-        }
+        val artifact = Artifact.newArtifactFromExtension(
+          path.getFileName.toString,
+          path.getFileName,
+          new LocalFile(path))
         Seq[Artifact](artifact)
 
       case "ivy" =>
@@ -108,6 +104,55 @@ class ArtifactManager(
    */
   def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
 
+  /**
+   * Add a single in-memory artifact to the session while preserving the 
directory structure
+   * specified by `target` under the session's working directory of that 
particular file
+   * extension.
+   *
+   * Supported target file extensions are .jar and .class.
+   *
+   * ==Example==
+   * {{{
+   *  addArtifact(bytesBar, "foo/bar.class")
+   *  addArtifact(bytesFlat, "flat.class")
+   *  // Directory structure of the session's working directory for class 
files would look like:
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+   * }}}
+   */
+  def addArtifact(bytes: Array[Byte], target: String): Unit = {
+    val targetPath = Paths.get(target)
+    val artifact = Artifact.newArtifactFromExtension(
+      targetPath.getFileName.toString,
+      targetPath,
+      new InMemory(bytes))
+    addArtifacts(artifact :: Nil)
+  }
+
+  /**
+   * Add a single artifact to the session while preserving the directory 
structure specified by
+   * `target` under the session's working directory of that particular file 
extension.
+   *
+   * Supported target file extensions are .jar and .class.
+   *
+   * ==Example==
+   * {{{
+   *  addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class")
+   *  addArtifact("/Users/dummyUser/files/flat.class", "flat.class")
+   *  // Directory structure of the session's working directory for class 
files would look like:
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+   * }}}
+   */
+  def addArtifact(source: String, target: String): Unit = {
+    val targetPath = Paths.get(target)
+    val artifact = Artifact.newArtifactFromExtension(
+      targetPath.getFileName.toString,
+      targetPath,
+      new LocalFile(Paths.get(source)))
+    addArtifacts(artifact :: Nil)
+  }
+
   /**
    * Add multiple artifacts to the session.
    *
@@ -366,12 +411,26 @@ object Artifact {
   val JAR_PREFIX: Path = Paths.get("jars")
   val CACHE_PREFIX: Path = Paths.get("cache")
 
-  def newJarArtifact(fileName: Path, storage: LocalData): Artifact = {
-    newArtifact(JAR_PREFIX, ".jar", fileName, storage)
+  def newArtifactFromExtension(
+      fileName: String,
+      targetFilePath: Path,
+      storage: LocalData): Artifact = {
+    fileName match {
+      case jar if jar.endsWith(".jar") =>
+        newJarArtifact(targetFilePath, storage)
+      case cf if cf.endsWith(".class") =>
+        newClassArtifact(targetFilePath, storage)
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported file format: 
$other")
+    }
+  }
+
+  def newJarArtifact(targetFilePath: Path, storage: LocalData): Artifact = {
+    newArtifact(JAR_PREFIX, ".jar", targetFilePath, storage)
   }
 
-  def newClassArtifact(fileName: Path, storage: LocalData): Artifact = {
-    newArtifact(CLASS_PREFIX, ".class", fileName, storage)
+  def newClassArtifact(targetFilePath: Path, storage: LocalData): Artifact = {
+    newArtifact(CLASS_PREFIX, ".class", targetFilePath, storage)
   }
 
   def newCacheArtifact(id: String, storage: LocalData): Artifact = {
@@ -412,14 +471,29 @@ object Artifact {
     jars.map(p => Paths.get(p)).map(path => newJarArtifact(path.getFileName, 
new LocalFile(path)))
   }
 
+  private def concatenatePaths(basePath: Path, otherPath: Path): Path = {
+    // We avoid using the `.resolve()` method here to ensure that we're 
concatenating the two
+    // paths even if `otherPath` is absolute.
+    val concatenatedPath = Paths.get(basePath.toString, otherPath.toString)
+    // Note: The normalized resulting path may still reference parent 
directories if the
+    // `otherPath` contains sufficient number of parent operators (i.e "..").
+    // Example: `basePath` = "/base", `otherPath` = "subdir/../../file.txt"
+    // Then, `concatenatedPath` = "/base/subdir/../../file.txt"
+    // and `normalizedPath` = "/base/file.txt".
+    val normalizedPath = concatenatedPath.normalize()
+    // Verify that the prefix of the `normalizedPath` starts with `basePath/`.
+    require(
+      normalizedPath != basePath && 
normalizedPath.startsWith(s"$basePath${File.separator}"))
+    normalizedPath
+  }
+
   private def newArtifact(
       prefix: Path,
       requiredSuffix: String,
-      fileName: Path,
+      targetFilePath: Path,
       storage: LocalData): Artifact = {
-    require(!fileName.isAbsolute)
-    require(fileName.toString.endsWith(requiredSuffix))
-    new Artifact(prefix.resolve(fileName), storage)
+    require(targetFilePath.toString.endsWith(requiredSuffix))
+    new Artifact(concatenatePaths(prefix, targetFilePath), storage)
   }
 
   /**
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index c2776e65392f..cd1dfbd2e734 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -301,6 +301,43 @@ private[sql] class SparkConnectClient(
    */
   def addArtifact(uri: URI): Unit = artifactManager.addArtifact(uri)
 
+  /**
+   * Add a single in-memory artifact to the session while preserving the 
directory structure
+   * specified by `target` under the session's working directory of that 
particular file
+   * extension.
+   *
+   * Supported target file extensions are .jar and .class.
+   *
+   * ==Example==
+   * {{{
+   *  addArtifact(bytesBar, "foo/bar.class")
+   *  addArtifact(bytesFlat, "flat.class")
+   *  // Directory structure of the session's working directory for class 
files would look like:
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+   * }}}
+   */
+  def addArtifact(bytes: Array[Byte], target: String): Unit =
+    artifactManager.addArtifact(bytes, target)
+
+  /**
+   * Add a single artifact to the session while preserving the directory 
structure specified by
+   * `target` under the session's working directory of that particular file 
extension.
+   *
+   * Supported target file extensions are .jar and .class.
+   *
+   * ==Example==
+   * {{{
+   *  addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class")
+   *  addArtifact("/Users/dummyUser/files/flat.class", "flat.class")
+   *  // Directory structure of the session's working directory for class 
files would look like:
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+   * }}}
+   */
+  def addArtifact(source: String, target: String): Unit =
+    artifactManager.addArtifact(source, target)
+
   /**
    * Add multiple artifacts to the session.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to