This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push:
new 19b2143 [SPARK-52319] Add `(Catalog|Schema|TableOrView)NotFound` to
`SparkConnectError`
19b2143 is described below
commit 19b214320dfe6442eedb6be16e532156eea52723
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon May 26 20:27:30 2025 -0700
[SPARK-52319] Add `(Catalog|Schema|TableOrView)NotFound` to
`SparkConnectError`
### What changes were proposed in this pull request?
This PR aims to add `(Catalog|Schema|TableOrView)NotFound` to
`SparkConnectError`.
### Why are the changes needed?
To provide a user can catch these exceptions easily instead of matching
`internalError` with string patterns.
### Does this PR introduce _any_ user-facing change?
Yes, but these are more specific exceptions than before.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #178 from dongjoon-hyun/SPARK-52319.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
Sources/SparkConnect/DataFrame.swift | 15 ++++++++++++++-
Sources/SparkConnect/SparkConnectError.swift | 3 +++
Tests/SparkConnectTests/CatalogTests.swift | 22 ++++++++++++++--------
3 files changed, 31 insertions(+), 9 deletions(-)
diff --git a/Sources/SparkConnect/DataFrame.swift
b/Sources/SparkConnect/DataFrame.swift
index 393168e..eda8bda 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -299,7 +299,20 @@ public actor DataFrame: Sendable {
),
interceptors: spark.client.getIntercepters()
) { client in
- return try await f(client)
+ do {
+ return try await f(client)
+ } catch let error as RPCError where error.code == .internalError {
+ switch error.message {
+ case let m where m.contains("CATALOG_NOT_FOUND"):
+ throw SparkConnectError.CatalogNotFound
+ case let m where m.contains("SCHEMA_NOT_FOUND"):
+ throw SparkConnectError.SchemaNotFound
+ case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"):
+ throw SparkConnectError.TableOrViewNotFound
+ default:
+ throw error
+ }
+ }
}
}
diff --git a/Sources/SparkConnect/SparkConnectError.swift
b/Sources/SparkConnect/SparkConnectError.swift
index cff40d7..c300075 100644
--- a/Sources/SparkConnect/SparkConnectError.swift
+++ b/Sources/SparkConnect/SparkConnectError.swift
@@ -19,8 +19,11 @@
/// A enum for ``SparkConnect`` package errors
public enum SparkConnectError: Error {
+ case CatalogNotFound
case InvalidArgument
case InvalidSessionID
case InvalidType
+ case SchemaNotFound
+ case TableOrViewNotFound
case UnsupportedOperation
}
diff --git a/Tests/SparkConnectTests/CatalogTests.swift
b/Tests/SparkConnectTests/CatalogTests.swift
index 053daf9..30f91b0 100644
--- a/Tests/SparkConnectTests/CatalogTests.swift
+++ b/Tests/SparkConnectTests/CatalogTests.swift
@@ -37,8 +37,14 @@ struct CatalogTests {
func setCurrentCatalog() async throws {
let spark = try await SparkSession.builder.getOrCreate()
try await spark.catalog.setCurrentCatalog("spark_catalog")
- try await #require(throws: Error.self) {
- try await spark.catalog.setCurrentCatalog("not_exist_catalog")
+ if await spark.version >= "4.0.0" {
+ try await #require(throws: SparkConnectError.CatalogNotFound) {
+ try await spark.catalog.setCurrentCatalog("not_exist_catalog")
+ }
+ } else {
+ try await #require(throws: Error.self) {
+ try await spark.catalog.setCurrentCatalog("not_exist_catalog")
+ }
}
await spark.stop()
}
@@ -63,7 +69,7 @@ struct CatalogTests {
func setCurrentDatabase() async throws {
let spark = try await SparkSession.builder.getOrCreate()
try await spark.catalog.setCurrentDatabase("default")
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.SchemaNotFound) {
try await spark.catalog.setCurrentDatabase("not_exist_database")
}
await spark.stop()
@@ -91,7 +97,7 @@ struct CatalogTests {
#expect(db.catalog == "spark_catalog")
#expect(db.description == "default database")
#expect(db.locationUri.hasSuffix("spark-warehouse"))
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.SchemaNotFound) {
try await spark.catalog.getDatabase("not_exist_database")
}
await spark.stop()
@@ -313,7 +319,7 @@ struct CatalogTests {
try await spark.catalog.cacheTable(tableName, StorageLevel.MEMORY_ONLY)
})
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
try await spark.catalog.cacheTable("not_exist_table")
}
await spark.stop()
@@ -330,7 +336,7 @@ struct CatalogTests {
#expect(try await spark.catalog.isCached(tableName))
})
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
try await spark.catalog.isCached("not_exist_table")
}
await spark.stop()
@@ -351,7 +357,7 @@ struct CatalogTests {
#expect(try await spark.catalog.isCached(tableName))
})
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
try await spark.catalog.refreshTable("not_exist_table")
}
await spark.stop()
@@ -386,7 +392,7 @@ struct CatalogTests {
#expect(try await spark.catalog.isCached(tableName) == false)
})
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
try await spark.catalog.uncacheTable("not_exist_table")
}
await spark.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]