This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push:
new b0ba4a0 [SPARK-52320] Add
`ColumnNotFound/InvalidViewName/TableOrViewAlreadyExists` to `SparkConnectError`
b0ba4a0 is described below
commit b0ba4a0cd1c67ffc6b5f7b30560fcbd08ac3ae23
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon May 26 22:15:00 2025 -0700
[SPARK-52320] Add `ColumnNotFound/InvalidViewName/TableOrViewAlreadyExists`
to `SparkConnectError`
### What changes were proposed in this pull request?
This PR aims to add `ColumnNotFound`, `InvalidViewName`,
`TableOrViewAlreadyExists` to `SparkConnectError`.
### Why are the changes needed?
To provide a user can catch these exceptions easily instead of matching
`internalError` with string patterns.
### Does this PR introduce _any_ user-facing change?
Yes, but these are more specific exceptions than before.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #179 from dongjoon-hyun/SPARK-52320.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
Sources/SparkConnect/DataFrame.swift | 2 +
Sources/SparkConnect/SparkConnectClient.swift | 15 +++++-
Sources/SparkConnect/SparkConnectError.swift | 3 ++
Tests/SparkConnectTests/CatalogTests.swift | 12 ++---
Tests/SparkConnectTests/DataFrameTests.swift | 4 +-
Tests/SparkConnectTests/DataFrameWriterTests.swift | 5 +-
.../SparkConnectTests/DataFrameWriterV2Tests.swift | 2 +-
Tests/SparkConnectTests/MergeIntoWriterTests.swift | 57 ++++++++++++++++------
8 files changed, 72 insertions(+), 28 deletions(-)
diff --git a/Sources/SparkConnect/DataFrame.swift
b/Sources/SparkConnect/DataFrame.swift
index eda8bda..fa1eb48 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -309,6 +309,8 @@ public actor DataFrame: Sendable {
throw SparkConnectError.SchemaNotFound
case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"):
throw SparkConnectError.TableOrViewNotFound
+ case let m where m.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION"):
+ throw SparkConnectError.ColumnNotFound
default:
throw error
}
diff --git a/Sources/SparkConnect/SparkConnectClient.swift
b/Sources/SparkConnect/SparkConnectClient.swift
index 93889ce..c9fc5b0 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -127,7 +127,20 @@ public actor SparkConnectClient {
),
interceptors: self.intercepters
) { client in
- return try await f(client)
+ do {
+ return try await f(client)
+ } catch let error as RPCError where error.code == .internalError {
+ switch error.message {
+ case let m where m.contains("TABLE_OR_VIEW_ALREADY_EXISTS"):
+ throw SparkConnectError.TableOrViewAlreadyExists
+ case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"):
+ throw SparkConnectError.TableOrViewNotFound
+ case let m where m.contains("Invalid view name:"):
+ throw SparkConnectError.InvalidViewName
+ default:
+ throw error
+ }
+ }
}
}
diff --git a/Sources/SparkConnect/SparkConnectError.swift
b/Sources/SparkConnect/SparkConnectError.swift
index c300075..f235859 100644
--- a/Sources/SparkConnect/SparkConnectError.swift
+++ b/Sources/SparkConnect/SparkConnectError.swift
@@ -20,10 +20,13 @@
/// A enum for ``SparkConnect`` package errors
public enum SparkConnectError: Error {
case CatalogNotFound
+ case ColumnNotFound
case InvalidArgument
case InvalidSessionID
case InvalidType
+ case InvalidViewName
case SchemaNotFound
+ case TableOrViewAlreadyExists
case TableOrViewNotFound
case UnsupportedOperation
}
diff --git a/Tests/SparkConnectTests/CatalogTests.swift
b/Tests/SparkConnectTests/CatalogTests.swift
index 30f91b0..b63fd35 100644
--- a/Tests/SparkConnectTests/CatalogTests.swift
+++ b/Tests/SparkConnectTests/CatalogTests.swift
@@ -205,12 +205,12 @@ struct CatalogTests {
try await spark.range(1).createTempView(viewName)
#expect(try await spark.catalog.tableExists(viewName))
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
try await spark.range(1).createTempView(viewName)
}
})
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.InvalidViewName) {
try await spark.range(1).createTempView("invalid view name")
}
@@ -228,7 +228,7 @@ struct CatalogTests {
try await spark.range(1).createOrReplaceTempView(viewName)
})
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.InvalidViewName) {
try await spark.range(1).createOrReplaceTempView("invalid view name")
}
@@ -244,13 +244,13 @@ struct CatalogTests {
try await spark.range(1).createGlobalTempView(viewName)
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)"))
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
try await spark.range(1).createGlobalTempView(viewName)
}
})
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)") ==
false)
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.InvalidViewName) {
try await spark.range(1).createGlobalTempView("invalid view name")
}
@@ -269,7 +269,7 @@ struct CatalogTests {
})
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)") ==
false)
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.InvalidViewName) {
try await spark.range(1).createOrReplaceGlobalTempView("invalid view
name")
}
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift
b/Tests/SparkConnectTests/DataFrameTests.swift
index 15cc6d6..f5c6eeb 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -235,10 +235,10 @@ struct DataFrameTests {
@Test
func selectInvalidColumn() async throws {
let spark = try await SparkSession.builder.getOrCreate()
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.ColumnNotFound) {
try await spark.range(1).select("invalid").schema
}
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.ColumnNotFound) {
try await spark.range(1).select("id + 1").schema
}
await spark.stop()
diff --git a/Tests/SparkConnectTests/DataFrameWriterTests.swift
b/Tests/SparkConnectTests/DataFrameWriterTests.swift
index 3ad1234..5228667 100644
--- a/Tests/SparkConnectTests/DataFrameWriterTests.swift
+++ b/Tests/SparkConnectTests/DataFrameWriterTests.swift
@@ -112,7 +112,7 @@ struct DataFrameWriterTests {
try await spark.range(1).write.saveAsTable(tableName)
#expect(try await spark.read.table(tableName).count() == 1)
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
try await spark.range(1).write.saveAsTable(tableName)
}
@@ -130,8 +130,7 @@ struct DataFrameWriterTests {
let spark = try await SparkSession.builder.getOrCreate()
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-",
with: "")
try await SQLHelper.withTable(spark, tableName)({
- // Table doesn't exist.
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
try await spark.range(1).write.insertInto(tableName)
}
diff --git a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift
b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift
index 938caa8..60f47c2 100644
--- a/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift
+++ b/Tests/SparkConnectTests/DataFrameWriterV2Tests.swift
@@ -34,7 +34,7 @@ struct DataFrameWriterV2Tests {
let write = try await spark.range(2).writeTo(tableName).using("orc")
try await write.create()
#expect(try await spark.table(tableName).count() == 2)
- try await #require(throws: Error.self) {
+ try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
try await write.create()
}
})
diff --git a/Tests/SparkConnectTests/MergeIntoWriterTests.swift
b/Tests/SparkConnectTests/MergeIntoWriterTests.swift
index d7d693d..9881653 100644
--- a/Tests/SparkConnectTests/MergeIntoWriterTests.swift
+++ b/Tests/SparkConnectTests/MergeIntoWriterTests.swift
@@ -31,11 +31,20 @@ struct MergeIntoWriterTests {
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-",
with: "")
try await SQLHelper.withTable(spark, tableName)({
let mergeInto = try await spark.range(1).mergeInto(tableName, "true")
- try await #require(throws: Error.self) {
- try await mergeInto.whenMatched().delete().merge()
- }
- try await #require(throws: Error.self) {
- try await mergeInto.whenMatched("true").delete().merge()
+ if await spark.version >= "4.0.0" {
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+ try await mergeInto.whenMatched().delete().merge()
+ }
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+ try await mergeInto.whenMatched("true").delete().merge()
+ }
+ } else {
+ try await #require(throws: Error.self) {
+ try await mergeInto.whenMatched().delete().merge()
+ }
+ try await #require(throws: Error.self) {
+ try await mergeInto.whenMatched("true").delete().merge()
+ }
}
})
await spark.stop()
@@ -47,11 +56,20 @@ struct MergeIntoWriterTests {
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-",
with: "")
try await SQLHelper.withTable(spark, tableName)({
let mergeInto = try await spark.range(1).mergeInto(tableName, "true")
- try await #require(throws: Error.self) {
- try await mergeInto.whenNotMatched().insertAll().merge()
- }
- try await #require(throws: Error.self) {
- try await mergeInto.whenNotMatched("true").insertAll().merge()
+ if await spark.version >= "4.0.0" {
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+ try await mergeInto.whenNotMatched().insertAll().merge()
+ }
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+ try await mergeInto.whenNotMatched("true").insertAll().merge()
+ }
+ } else {
+ try await #require(throws: Error.self) {
+ try await mergeInto.whenNotMatched().insertAll().merge()
+ }
+ try await #require(throws: Error.self) {
+ try await mergeInto.whenNotMatched("true").insertAll().merge()
+ }
}
})
await spark.stop()
@@ -63,11 +81,20 @@ struct MergeIntoWriterTests {
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-",
with: "")
try await SQLHelper.withTable(spark, tableName)({
let mergeInto = try await spark.range(1).mergeInto(tableName, "true")
- try await #require(throws: Error.self) {
- try await mergeInto.whenNotMatchedBySource().delete().merge()
- }
- try await #require(throws: Error.self) {
- try await mergeInto.whenNotMatchedBySource("true").delete().merge()
+ if await spark.version >= "4.0.0" {
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+ try await mergeInto.whenNotMatchedBySource().delete().merge()
+ }
+ try await #require(throws: SparkConnectError.TableOrViewNotFound) {
+ try await mergeInto.whenNotMatchedBySource("true").delete().merge()
+ }
+ } else {
+ try await #require(throws: Error.self) {
+ try await mergeInto.whenNotMatchedBySource().delete().merge()
+ }
+ try await #require(throws: Error.self) {
+ try await mergeInto.whenNotMatchedBySource("true").delete().merge()
+ }
}
})
await spark.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]