This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fead8a7962a [SPARK-43993][SQL][TESTS] Add tests for cache artifacts fead8a7962a is described below commit fead8a7962a717aae5cab9eef51eed2ac684f070 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Wed Jun 7 16:00:49 2023 +0300 [SPARK-43993][SQL][TESTS] Add tests for cache artifacts ### What changes were proposed in this pull request? In the PR, I propose to add a test to check two methods of the artifact manager: - `isCachedArtifact()` - `cacheArtifact()` ### Why are the changes needed? To improve test coverage of Artifacts API. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running new test: ``` $ build/sbt "test:testOnly *.ArtifactSuite" ``` Closes #41493 from MaxGekk/test-cache-artifact. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/connect/client/ArtifactManager.scala | 2 +- .../spark/sql/connect/client/ArtifactSuite.scala | 14 ++++++++++++ .../connect/client/SparkConnectClientSuite.scala | 25 +++++++++++++++++++++- 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala index acd9f279c6d..6d0d16df946 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala @@ -108,7 +108,7 @@ class ArtifactManager( */ def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) - private def isCachedArtifact(hash: String): Boolean = { + private[client] def isCachedArtifact(hash: String): Boolean = { val artifactName = CACHE_PREFIX + "/" + hash val request = proto.ArtifactStatusesRequest .newBuilder() diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala index 506ad3625b0..39ab0eef412 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import com.google.protobuf.ByteString import io.grpc.{ManagedChannel, Server} import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} +import org.apache.commons.codec.digest.DigestUtils.sha256Hex import org.scalatest.BeforeAndAfterEach import org.apache.spark.connect.proto @@ -248,4 +249,17 @@ class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach { assertFileDataEquality(remainingArtifacts.get(0).getData, Paths.get(file3)) assertFileDataEquality(remainingArtifacts.get(1).getData, Paths.get(file4)) } + + test("cache an artifact and check its presence") { + val s = "Hello, World!" + val blob = s.getBytes("UTF-8") + val expectedHash = sha256Hex(blob) + assert(artifactManager.isCachedArtifact(expectedHash) === false) + val actualHash = artifactManager.cacheArtifact(blob) + assert(actualHash === expectedHash) + assert(artifactManager.isCachedArtifact(expectedHash) === true) + + val receivedRequests = service.getAndClearLatestAddArtifactRequests() + assert(receivedRequests.size == 1) + } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala index 7a0ad1a9e2a..7e0b687054d 100755 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connect.client import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ import scala.collection.mutable import io.grpc.{Server, StatusRuntimeException} @@ -26,7 +27,7 @@ import io.grpc.stub.StreamObserver import org.scalatest.BeforeAndAfterEach import org.apache.spark.connect.proto -import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, AnalyzePlanRequest, AnalyzePlanResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc} +import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, AnalyzePlanRequest, AnalyzePlanResponse, ArtifactStatusesRequest, ArtifactStatusesResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.client.util.ConnectFunSuite import org.apache.spark.sql.connect.common.config.ConnectCommon @@ -251,4 +252,26 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer responseObserver.onCompleted() } } + + override def artifactStatus( + request: ArtifactStatusesRequest, + responseObserver: StreamObserver[ArtifactStatusesResponse]): Unit = { + val builder = proto.ArtifactStatusesResponse.newBuilder() + request.getNamesList().iterator().asScala.foreach { name => + val status = proto.ArtifactStatusesResponse.ArtifactStatus.newBuilder() + val exists = if (name.startsWith("cache/")) { + inputArtifactRequests.exists { artifactReq => + if (artifactReq.hasBatch) { + val batch = artifactReq.getBatch + batch.getArtifactsList.asScala.exists { singleArtifact => + singleArtifact.getName == name + } + } else false + } + } else false + builder.putStatuses(name, status.setExists(exists).build()) + } + responseObserver.onNext(builder.build()) + responseObserver.onCompleted() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org