This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.2 by this push:
new 7780a5610d fix(file-service): apply LakeFS error handling to all call
sites (#5607)
7780a5610d is described below
commit 7780a5610deb333710022adf7a47b217f447958e
Author: Suyash Jain <[email protected]>
AuthorDate: Fri Jun 12 16:59:39 2026 +0000
fix(file-service): apply LakeFS error handling to all call sites (#5607)
### What changes were proposed in this PR?
#4177 introduced `LakeFSExceptionHandler.withLakeFSErrorHandling`, but
only the multipart-upload and dataset-version paths used it. The
remaining LakeFS call sites in `DatasetResource` either leaked raw
`io.lakefs.clients.sdk.ApiException` to Dropwizard (an opaque 500 for
the frontend) or caught `Exception` and rewrapped it as a generic 500,
discarding the real LakeFS status code (401/403/404/409/...).
```
Before: LakeFS 404 -> raw ApiException / catch(Exception) -> 500
"Failed to ..."
After: LakeFS 404 -> withLakeFSErrorHandling -> 404
"Error while deleting file 'a.csv' ...: LakeFS resource not found. ..."
```
Changes:
| Change | Where |
| --- | --- |
| New overload `withLakeFSErrorHandling(operation: String)(call)` that
prefixes the user-visible message with the failed operation |
`LakeFSExceptionHandler.scala` |
| 8 bare LakeFS calls now wrapped (size lookup, version listing, zip
download, presigned URLs, cover image) | `DatasetResource.scala` |
| 5 `catch Exception -> generic 500` blocks now use the handler;
compensation logic (DB rollback on failed repo init, multipart abort) is
preserved, and the abort-on-failure cleanup no longer masks the original
error | `DatasetResource.scala` |
Intentionally unchanged: best-effort cleanup sites that deliberately
swallow errors, the per-dataset skip in `listDatasets`, and the
`FileService` startup health check (failing fast at boot is correct
there).
### Any related issues, documentation, discussions?
Closes #4176
### How was this PR tested?
New `LakeFSExceptionHandlerSpec` (7 unit cases): status-code mapping
(400/401/403/404/409/4xx/5xx/unknown), operation context included in the
frontend-visible message, success passthrough, and non-LakeFS exceptions
propagating untouched.
New integration case in `DatasetResourceSpec`: deleting a dataset whose
LakeFS repository does not exist now yields `NotFoundException` (404)
instead of a generic 500.
```
sbt "FileService/testOnly
org.apache.texera.service.util.LakeFSExceptionHandlerSpec"
# Tests: succeeded 7, failed 0
sbt "FileService/testOnly
org.apache.texera.service.resource.DatasetResourceSpec"
# Tests: succeeded 94, failed 0 (Testcontainers: LakeFS 1.51 + MinIO +
Postgres)
```
`sbt FileService/scalafixAll` and `sbt FileService/scalafmtAll` produce
no further diff.
### Was this PR authored or co-authored using generative AI tooling?
Yes, partially. I (Suyash Jain) worked on this PR together with Claude
Code as a pair-programming assistant. I reviewed the final diff and ran
the unit and Testcontainers-based integration suites locally before
opening the PR.
(backported from commit 7b1c8dc7abca17465039aa5c043a302d3580b419)
Generated-by: Claude Code (Claude Opus 4.7)
---
.../texera/service/resource/DatasetResource.scala | 150 ++++++++++++---------
.../service/util/LakeFSExceptionHandler.scala | 25 +++-
.../org/apache/texera/service/MockLakeFS.scala | 21 ++-
.../service/resource/DatasetResourceSpec.scala | 123 ++++++++++++++++-
.../service/util/LakeFSExceptionHandlerSpec.scala | 115 ++++++++++++++++
5 files changed, 364 insertions(+), 70 deletions(-)
diff --git
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
index 6da19a924f..1f8f28a85c 100644
---
a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
+++
b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala
@@ -251,7 +251,9 @@ class DatasetResource extends LazyLogging {
getOwner(ctx, did).getEmail,
userAccessPrivilege,
isOwner,
-
LakeFSStorageClient.retrieveRepositorySize(targetDataset.getRepositoryName)
+ withLakeFSErrorHandling(s"retrieving the size of dataset
'${targetDataset.getName}'") {
+
LakeFSStorageClient.retrieveRepositorySize(targetDataset.getRepositoryName)
+ }
)
}
@@ -309,16 +311,23 @@ class DatasetResource extends LazyLogging {
// Initialize the repository in LakeFS
val repositoryName = s"dataset-${createdDataset.getDid}"
try {
- LakeFSStorageClient.initRepo(repositoryName)
+ withLakeFSErrorHandling(s"creating the repository of dataset
'${dataset.getName}'") {
+ LakeFSStorageClient.initRepo(repositoryName)
+ }
} catch {
case e: Exception =>
+ // roll back the dataset record so a failed LakeFS init leaves no
orphan row
ctx
.deleteFrom(DATASET)
.where(DATASET.DID.eq(createdDataset.getDid))
.execute()
- throw new WebApplicationException(
- s"Failed to create the dataset: ${e.getMessage}"
- )
+ e match {
+ case web: WebApplicationException => throw web
+ case other =>
+ throw new WebApplicationException(
+ s"Failed to create the dataset: ${other.getMessage}"
+ )
+ }
}
// update repository name of the created dataset
@@ -444,14 +453,8 @@ class DatasetResource extends LazyLogging {
// throw the exception that user has no access to certain dataset
throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
}
- try {
+ withLakeFSErrorHandling(s"deleting the repository of dataset
'${dataset.getName}'") {
LakeFSStorageClient.deleteRepo(dataset.getRepositoryName)
- } catch {
- case e: Exception =>
- throw new WebApplicationException(
- s"Failed to delete a repository in LakeFS: ${e.getMessage}",
- e
- )
}
// delete the directory on S3
if (
@@ -600,30 +603,39 @@ class DatasetResource extends LazyLogging {
flush()
// ---------- complete upload ----------
- LakeFSStorageClient.completePresignedMultipartUploads(
- repoName,
- filePath,
- uploadId,
- completedParts.toList,
- physicalAddress
- )
+ withLakeFSErrorHandling(s"completing the multipart upload of file
'$filePath'") {
+ LakeFSStorageClient.completePresignedMultipartUploads(
+ repoName,
+ filePath,
+ uploadId,
+ completedParts.toList,
+ physicalAddress
+ )
+ }
Response.ok(Map("message" -> s"Uploaded $filePath in
${completedParts.size} parts")).build()
}
} catch {
case e: Exception =>
if (repoName != null && filePath != null && uploadId != null &&
physicalAddress != null) {
- LakeFSStorageClient.abortPresignedMultipartUploads(
- repoName,
- filePath,
- uploadId,
- physicalAddress
- )
+ // best-effort cleanup; never let an abort failure mask the original
error
+ try {
+ LakeFSStorageClient.abortPresignedMultipartUploads(
+ repoName,
+ filePath,
+ uploadId,
+ physicalAddress
+ )
+ } catch { case _: Throwable => () }
+ }
+ e match {
+ case web: WebApplicationException => throw web
+ case other =>
+ throw new WebApplicationException(
+ s"Failed to upload file to dataset: ${other.getMessage}",
+ other
+ )
}
- throw new WebApplicationException(
- s"Failed to upload file to dataset: ${e.getMessage}",
- e
- )
}
}
@@ -693,14 +705,8 @@ class DatasetResource extends LazyLogging {
// Decode the file path
val filePath = URLDecoder.decode(encodedFilePath,
StandardCharsets.UTF_8.name())
- // Try to initialize the repository in LakeFS
- try {
+ withLakeFSErrorHandling(s"deleting file '$filePath' from the dataset
repository") {
LakeFSStorageClient.deleteObject(repositoryName, filePath)
- } catch {
- case e: Exception =>
- throw new WebApplicationException(
- s"Failed to delete the file from repo in LakeFS: ${e.getMessage}"
- )
}
Response.ok().build()
@@ -1042,14 +1048,8 @@ class DatasetResource extends LazyLogging {
// Decode the file path
val filePath = URLDecoder.decode(encodedFilePath,
StandardCharsets.UTF_8.name())
- // Try to reset the file change in LakeFS
- try {
+ withLakeFSErrorHandling(s"resetting uncommitted changes of file
'$filePath'") {
LakeFSStorageClient.resetObjectUploadOrDeletion(repositoryName,
filePath)
- } catch {
- case e: Exception =>
- throw new WebApplicationException(
- s"Failed to reset the changes from repo in LakeFS: ${e.getMessage}"
- )
}
Response.ok().build()
}
@@ -1255,7 +1255,11 @@ class DatasetResource extends LazyLogging {
val datasetName = dataset.getName
val repositoryName = dataset.getRepositoryName
val versionHash = datasetVersion.getVersionHash
- val objects =
LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName, versionHash)
+ val objects = withLakeFSErrorHandling(
+ s"listing files of version '$versionHash' of dataset '$datasetName'"
+ ) {
+ LakeFSStorageClient.retrieveObjectsOfVersion(repositoryName,
versionHash)
+ }
if (objects.isEmpty) {
return Response
@@ -1271,7 +1275,9 @@ class DatasetResource extends LazyLogging {
try {
objects.foreach { obj =>
val filePath = obj.getPath
- val file = LakeFSStorageClient.getFileFromRepo(repositoryName,
versionHash, filePath)
+ val file = withLakeFSErrorHandling(s"downloading file
'$filePath' for the zip") {
+ LakeFSStorageClient.getFileFromRepo(repositoryName,
versionHash, filePath)
+ }
zipOut.putNextEntry(new ZipEntry(filePath))
Files.copy(Paths.get(file.toURI), zipOut)
@@ -1432,11 +1438,15 @@ class DatasetResource extends LazyLogging {
errorResponse
case Right((resolvedRepositoryName, resolvedCommitHash,
resolvedFilePath)) =>
- val url = LakeFSStorageClient.getFilePresignedUrl(
- resolvedRepositoryName,
- resolvedCommitHash,
- resolvedFilePath
- )
+ val url = withLakeFSErrorHandling(
+ s"generating a presigned URL for file '$resolvedFilePath'"
+ ) {
+ LakeFSStorageClient.getFilePresignedUrl(
+ resolvedRepositoryName,
+ resolvedCommitHash,
+ resolvedFilePath
+ )
+ }
Response.ok(Map("presignedUrl" -> url)).build()
}
@@ -2125,11 +2135,13 @@ class DatasetResource extends LazyLogging {
)
.asInstanceOf[OnDataset]
- val fileSize = LakeFSStorageClient.getFileSize(
- document.getRepositoryName(),
- document.getVersionHash(),
- document.getFileRelativePath()
- )
+ val fileSize = withLakeFSErrorHandling(s"reading the size of cover image
'$normalized'") {
+ LakeFSStorageClient.getFileSize(
+ document.getRepositoryName(),
+ document.getVersionHash(),
+ document.getFileRelativePath()
+ )
+ }
if (fileSize > COVER_IMAGE_SIZE_LIMIT_BYTES) {
throw new BadRequestException(
@@ -2179,11 +2191,15 @@ class DatasetResource extends LazyLogging {
.openReadonlyDocument(FileResolver.resolve(fullPath))
.asInstanceOf[OnDataset]
- val presignedUrl = LakeFSStorageClient.getFilePresignedUrl(
- document.getRepositoryName(),
- document.getVersionHash(),
- document.getFileRelativePath()
- )
+ val presignedUrl = withLakeFSErrorHandling(
+ s"generating a presigned URL for cover image '$coverImage'"
+ ) {
+ LakeFSStorageClient.getFilePresignedUrl(
+ document.getRepositoryName(),
+ document.getVersionHash(),
+ document.getFileRelativePath()
+ )
+ }
Response.temporaryRedirect(new URI(presignedUrl)).build()
}
@@ -2224,11 +2240,15 @@ class DatasetResource extends LazyLogging {
.openReadonlyDocument(FileResolver.resolve(fullPath))
.asInstanceOf[OnDataset]
- val presignedUrl = LakeFSStorageClient.getFilePresignedUrl(
- document.getRepositoryName(),
- document.getVersionHash(),
- document.getFileRelativePath()
- )
+ val presignedUrl = withLakeFSErrorHandling(
+ s"generating a presigned URL for cover image '$coverImage'"
+ ) {
+ LakeFSStorageClient.getFilePresignedUrl(
+ document.getRepositoryName(),
+ document.getVersionHash(),
+ document.getFileRelativePath()
+ )
+ }
Response.ok(Map("url" -> presignedUrl)).build()
}
diff --git
a/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala
b/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala
index c1997fb647..1894817c43 100644
---
a/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala
+++
b/file-service/src/main/scala/org/apache/texera/service/util/LakeFSExceptionHandler.scala
@@ -46,17 +46,36 @@ object LakeFSExceptionHandler {
try {
call
} catch {
- case e: io.lakefs.clients.sdk.ApiException => handleException(e)
+ case e: io.lakefs.clients.sdk.ApiException => handleException(e, None)
+ }
+ }
+
+ /**
+ * Wraps a LakeFS call with centralized error handling, prefixing the
+ * user-visible message with the operation that failed
+ * (e.g. "deleting file 'a.csv' from dataset 'd1'").
+ */
+ def withLakeFSErrorHandling[T](operation: String)(call: => T): T = {
+ try {
+ call
+ } catch {
+ case e: io.lakefs.clients.sdk.ApiException => handleException(e,
Some(operation))
}
}
/**
* Converts LakeFS ApiException to appropriate HTTP exception
*/
- private def handleException(e: io.lakefs.clients.sdk.ApiException): Nothing
= {
+ private def handleException(
+ e: io.lakefs.clients.sdk.ApiException,
+ operation: Option[String]
+ ): Nothing = {
val code = e.getCode
val rawBody = Option(e.getResponseBody).filter(_.nonEmpty)
- val message = s"${fallbackMessages(code)}"
+ val message = operation match {
+ case Some(op) => s"Error while $op: ${fallbackMessages(code)}"
+ case None => fallbackMessages(code)
+ }
logger.warn(s"LakeFS error $code, ${e.getMessage}, body:
${rawBody.getOrElse("N/A")}")
diff --git
a/file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala
b/file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala
index 10c68bd085..62e60bd267 100644
--- a/file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala
+++ b/file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala
@@ -52,6 +52,16 @@ trait MockLakeFS extends ForAllTestContainer with
BeforeAndAfterAll { self: Suit
.createContainer()
postgres.container.withNetwork(network)
+ // LakeFS bakes the pre-signed endpoint into its env before containers start,
+ // so MinIO cannot use a dynamically mapped host port: presigned URLs must be
+ // reachable from the host at an address known ahead of time. Reserve a free
+ // host port and pin MinIO's 9000 to it.
+ val minioHostPort: Int = {
+ val socket = new java.net.ServerSocket(0)
+ try socket.getLocalPort
+ finally socket.close()
+ }
+
// MinIO for object storage
val minio = MinIOContainer(
dockerImageName =
DockerImageName.parse("minio/minio:RELEASE.2025-02-28T09-55-16Z"),
@@ -59,6 +69,15 @@ trait MockLakeFS extends ForAllTestContainer with
BeforeAndAfterAll { self: Suit
password = "password"
)
minio.container.withNetwork(network)
+ minio.container.withCreateContainerCmdModifier { cmd =>
+ import com.github.dockerjava.api.model.{ExposedPort, PortBinding, Ports}
+ // setting explicit bindings replaces them all, so 9001 (console) must keep
+ // a dynamic binding or the container readiness check never passes
+ cmd.getHostConfig.withPortBindings(
+ new PortBinding(Ports.Binding.bindPort(minioHostPort),
ExposedPort.tcp(9000)),
+ new PortBinding(Ports.Binding.empty(), ExposedPort.tcp(9001))
+ )
+ }
// LakeFS
val lakefsDatabaseURL: String =
@@ -80,7 +99,7 @@ trait MockLakeFS extends ForAllTestContainer with
BeforeAndAfterAll { self: Suit
"LAKEFS_BLOCKSTORE_TYPE" -> "s3",
"LAKEFS_BLOCKSTORE_S3_FORCE_PATH_STYLE" -> "true",
"LAKEFS_BLOCKSTORE_S3_ENDPOINT" ->
s"http://${minio.container.getNetworkAliases.get(0)}:9000",
- "LAKEFS_BLOCKSTORE_S3_PRE_SIGNED_ENDPOINT" -> "http://localhost:9000",
+ "LAKEFS_BLOCKSTORE_S3_PRE_SIGNED_ENDPOINT" ->
s"http://localhost:$minioHostPort",
"LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID" -> "texera_minio",
"LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY" -> "password",
"LAKEFS_AUTH_ENCRYPT_SECRET_KEY" -> "random_string_for_lakefs",
diff --git
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
index eec7628a0b..1730d12a0a 100644
---
a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
+++
b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala
@@ -51,7 +51,7 @@ import org.scalatest.tagobjects.Slow
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Tag}
import org.slf4j.LoggerFactory
-import java.io.{ByteArrayInputStream, IOException, InputStream}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, IOException,
InputStream}
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
@@ -380,6 +380,127 @@ class DatasetResourceSpec
datasetDao.fetchOneByDid(dataset.getDid) should not be null
}
+ it should "surface a LakeFS 404 as NotFoundException when deleting a dataset
whose repo is missing" in {
+ val dataset = new Dataset
+ dataset.setName("delete-ds-no-repo")
+ dataset.setRepositoryName("delete-ds-no-repo")
+ dataset.setDescription("for lakefs 404 mapping test")
+ dataset.setOwnerUid(ownerUser.getUid)
+ dataset.setIsPublic(true)
+ dataset.setIsDownloadable(true)
+ datasetDao.insert(dataset)
+ // intentionally no LakeFSStorageClient.initRepo: the repository does not
exist in LakeFS
+
+ val ex = intercept[NotFoundException] {
+ datasetResource.deleteDataset(dataset.getDid, sessionUser)
+ }
+ assertStatus(ex, 404)
+ }
+
+ "getDataset" should "return the dashboard dataset including its LakeFS
repository size" in {
+ testDatasetVersion // ensures the LakeFS repo for baseDataset exists
+ val dashboardDataset = datasetResource.getDataset(baseDataset.getDid,
sessionUser)
+ dashboardDataset.dataset.getDid shouldEqual baseDataset.getDid
+ dashboardDataset.size should be >= 0L
+ }
+
+ it should "surface a LakeFS 404 as NotFoundException when the dataset repo
is missing" in {
+ val dataset = new Dataset
+ dataset.setName("get-ds-no-repo")
+ dataset.setRepositoryName("get-ds-no-repo")
+ dataset.setDescription("for lakefs 404 mapping test on getDataset")
+ dataset.setOwnerUid(ownerUser.getUid)
+ dataset.setIsPublic(true)
+ dataset.setIsDownloadable(true)
+ datasetDao.insert(dataset)
+ // intentionally no LakeFSStorageClient.initRepo: the repository does not
exist in LakeFS
+
+ val ex = intercept[NotFoundException] {
+ datasetResource.getDataset(dataset.getDid, sessionUser)
+ }
+ assertStatus(ex, 404)
+ }
+
+ "uploadOneFileToDataset" should "stream a small file and complete the
multipart upload" in {
+ testDatasetVersion // ensures the LakeFS repo for baseDataset exists
+ val payload = tinyBytes(0x5a, 2048)
+ val resp = datasetResource.uploadOneFileToDataset(
+ baseDataset.getDid,
+ urlEnc("upload-one-shot/sample.bin"),
+ "upload via single-file endpoint",
+ new ByteArrayInputStream(payload),
+ mkHeaders(payload.length.toLong),
+ sessionUser
+ )
+ resp.getStatus shouldEqual 200
+ }
+
+ it should "abort and wrap a mid-stream failure in a WebApplicationException"
in {
+ testDatasetVersion
+ val payload = tinyBytes(0x33, 4096)
+ val ex = intercept[WebApplicationException] {
+ datasetResource.uploadOneFileToDataset(
+ baseDataset.getDid,
+ urlEnc("upload-one-shot/flaky.bin"),
+ "should fail mid-stream",
+ flakyStream(payload, failAfterBytes = 16),
+ mkHeaders(payload.length.toLong),
+ sessionUser
+ )
+ }
+ assertStatus(ex, 500)
+ ex.getMessage should include("Failed to upload file to dataset")
+ }
+
+ it should "rethrow WebApplicationExceptions unchanged when the user has no
write access" in {
+ val ex = intercept[ForbiddenException] {
+ datasetResource.uploadOneFileToDataset(
+ multipartDataset.getDid,
+ urlEnc("upload-one-shot/forbidden.bin"),
+ "no write access",
+ new ByteArrayInputStream(tinyBytes(0x01)),
+ mkHeaders(1L),
+ multipartNoWriteSessionUser
+ )
+ }
+ assertStatus(ex, 403)
+ }
+
+ "getDatasetVersionZip" should "zip all files of a dataset version" in {
+ val version = testDatasetVersion
+ val resp =
+ datasetResource.getDatasetVersionZip(baseDataset.getDid,
version.getDvid, null, sessionUser)
+ resp.getStatus shouldEqual 200
+ val out = new ByteArrayOutputStream()
+ resp.getEntity.asInstanceOf[StreamingOutput].write(out)
+ out.size() should be > 0
+ }
+
+ "getPresignedUrl" should "generate a presigned URL for an existing file" in {
+ testDatasetVersion
+ val resp = datasetResource.getPresignedUrl(
+ urlEnc("test-cover.jpg"),
+ baseDataset.getRepositoryName,
+ "main",
+ sessionUser
+ )
+ resp.getStatus shouldEqual 200
+ entityAsScalaMap(resp).get("presignedUrl") should not be None
+ }
+
+ it should "surface a LakeFS 404 as NotFoundException for a nonexistent file"
in {
+ testDatasetVersion
+ val ex = intercept[NotFoundException] {
+ datasetResource.getPresignedUrl(
+ urlEnc("does-not-exist.bin"),
+ baseDataset.getRepositoryName,
+ "main",
+ sessionUser
+ )
+ }
+ assertStatus(ex, 404)
+ }
+
"listDatasets" should "include a dataset whose LakeFS repo exists" in {
val repoName = s"list-ok-${System.nanoTime()}"
val dataset = new Dataset
diff --git
a/file-service/src/test/scala/org/apache/texera/service/util/LakeFSExceptionHandlerSpec.scala
b/file-service/src/test/scala/org/apache/texera/service/util/LakeFSExceptionHandlerSpec.scala
new file mode 100644
index 0000000000..7f3b27cd60
--- /dev/null
+++
b/file-service/src/test/scala/org/apache/texera/service/util/LakeFSExceptionHandlerSpec.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import io.lakefs.clients.sdk.ApiException
+import jakarta.ws.rs._
+import
org.apache.texera.service.util.LakeFSExceptionHandler.withLakeFSErrorHandling
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class LakeFSExceptionHandlerSpec extends AnyFlatSpec with Matchers {
+
+ private def lakeFSError(code: Int): ApiException =
+ new ApiException(code, s"lakefs returned $code")
+
+ // typed as Unit (not Nothing) so the no-context overload resolves
unambiguously
+ private def failingCall(code: Int): Unit = throw lakeFSError(code)
+
+ private def entityMessage(e: WebApplicationException): String =
+ e.getResponse.getEntity
+ .asInstanceOf[java.util.Map[String, String]]
+ .get("message")
+
+ "withLakeFSErrorHandling" should "return the call's result when no exception
is thrown" in {
+ withLakeFSErrorHandling("reading a file")(42) shouldEqual 42
+ }
+
+ it should "map each LakeFS client error code to the matching JAX-RS
exception" in {
+ val expected = Map(
+ 400 -> classOf[BadRequestException],
+ 401 -> classOf[NotAuthorizedException],
+ 403 -> classOf[ForbiddenException],
+ 404 -> classOf[NotFoundException]
+ )
+ expected.foreach {
+ case (code, exceptionClass) =>
+ val thrown = intercept[WebApplicationException] {
+ withLakeFSErrorHandling(failingCall(code))
+ }
+ thrown.getClass shouldEqual exceptionClass
+ thrown.getResponse.getStatus shouldEqual code
+ }
+ }
+
+ it should "keep the original status for other 4xx codes (e.g. 409 conflict)"
in {
+ val thrown = intercept[WebApplicationException] {
+ withLakeFSErrorHandling(failingCall(409))
+ }
+ thrown.getResponse.getStatus shouldEqual 409
+ }
+
+ it should "map server-side and unknown codes to a 500 response" in {
+ Seq(500, 502, 503, 0).foreach { code =>
+ val thrown = intercept[InternalServerErrorException] {
+ withLakeFSErrorHandling(failingCall(code))
+ }
+ thrown.getResponse.getStatus shouldEqual 500
+ }
+ }
+
+ it should "include the operation context in the message visible to the
frontend" in {
+ val thrown = intercept[NotFoundException] {
+ withLakeFSErrorHandling("deleting file 'a.csv' from dataset 'd1'") {
+ throw lakeFSError(404)
+ }
+ }
+ val message = entityMessage(thrown)
+ message should include("deleting file 'a.csv' from dataset 'd1'")
+ message should include("not found")
+ }
+
+ it should "produce a frontend-readable message without operation context
too" in {
+ val thrown = intercept[ForbiddenException] {
+ withLakeFSErrorHandling(failingCall(403))
+ }
+ entityMessage(thrown) should include("Permission denied")
+ }
+
+ it should "let non-LakeFS exceptions propagate unchanged" in {
+ val original = new IllegalStateException("not a lakefs error")
+ val thrown = intercept[IllegalStateException] {
+ withLakeFSErrorHandling("any operation")(throw original)
+ }
+ thrown should be theSameInstanceAs original
+ }
+
+ it should "still map the status code when LakeFS provides a response body"
in {
+ val withBody = new ApiException(
+ 404,
+ java.util.Collections.emptyMap[String, java.util.List[String]](),
+ """{"message":"object not found"}"""
+ )
+ val thrown = intercept[NotFoundException] {
+ withLakeFSErrorHandling("reading file 'a.csv'")(throw withBody)
+ }
+ entityMessage(thrown) should include("LakeFS resource not found")
+ }
+}