This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push:
new 5c3c00f [SPARK-52068] Add `StreamingQuery` actor
5c3c00f is described below
commit 5c3c00fa8a4bd23f13b6d4a535f01e5d8040270c
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat May 10 17:20:29 2025 -0700
[SPARK-52068] Add `StreamingQuery` actor
### What changes were proposed in this pull request?
This PR aims to add `StreamingQuery` actor.
### Why are the changes needed?
To implement `StreamingQueryManager`, we need this first.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #125 from dongjoon-hyun/SPARK-52068.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
Sources/SparkConnect/SparkConnectClient.swift | 16 ++
Sources/SparkConnect/StreamingQuery.swift | 176 ++++++++++++++++++++++
Sources/SparkConnect/TypeAliases.swift | 1 +
Tests/SparkConnectTests/StreamingQueryTests.swift | 57 +++++++
4 files changed, 250 insertions(+)
diff --git a/Sources/SparkConnect/SparkConnectClient.swift
b/Sources/SparkConnect/SparkConnectClient.swift
index d74528b..de019b8 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -981,6 +981,22 @@ public actor SparkConnectClient {
try await execute(self.sessionID!, command)
}
+ func executeStreamingQueryCommand(
+ _ id: String,
+ _ runID: String,
+ _ command: StreamingQueryCommand.OneOf_Command,
+ ) async throws -> [ExecutePlanResponse] {
+ var queryID = Spark_Connect_StreamingQueryInstanceId()
+ queryID.id = id
+ queryID.runID = runID
+ var streamingQueryCommand = Spark_Connect_StreamingQueryCommand()
+ streamingQueryCommand.queryID = queryID
+ streamingQueryCommand.command = command
+ var command = Spark_Connect_Command()
+ command.streamingQueryCommand = streamingQueryCommand
+ return try await execute(self.sessionID!, command)
+ }
+
private enum URIParams {
static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size"
static let PARAM_SESSION_ID = "session_id"
diff --git a/Sources/SparkConnect/StreamingQuery.swift
b/Sources/SparkConnect/StreamingQuery.swift
new file mode 100644
index 0000000..6d62f2e
--- /dev/null
+++ b/Sources/SparkConnect/StreamingQuery.swift
@@ -0,0 +1,176 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+import Foundation
+
+public struct StreamingQueryException: Sendable {
+ let exceptionMessage: String
+ let errorClass: String
+ let stackTrace: String
+}
+
+public struct StreamingQueryStatus: Sendable {
+ let statusMessage: String
+ let isDataAvailable: Bool
+ let isTriggerActive: Bool
+ let isActive: Bool
+}
+
+/// A handle to a query that is executing continuously in the background as
new data arrives.
+public actor StreamingQuery: Sendable {
+ /// Returns the unique id of this query that persists across restarts from
checkpoint data. That
+ /// is, this id is generated when a query is started for the first time, and
will be the same
+ /// every time it is restarted from checkpoint data. Also see ``runId``.
+ public let id: UUID
+
+ /// Returns the unique id of this run of the query. That is, every
start/restart of a query will
+ /// generate a unique runId. Therefore, every time a query is restarted from
checkpoint, it will
+ /// have the same ``id`` but different ``runId``s.
+ public let runId: UUID
+
+ /// Returns the user-specified name of the query, or null if not specified.
This name can be
+ /// specified in the `org.apache.spark.sql.streaming.DataStreamWriter` as
+ /// `dataframe.writeStream.queryName("query").start()`. This name, if set,
must be unique across
+ /// all active queries.
+ public let name: String
+
+ /// Returns the `SparkSession` associated with `this`.
+ public let sparkSession: SparkSession
+
+ init(_ id: UUID, _ runId: UUID, _ name: String, _ sparkSession:
SparkSession) {
+ self.id = id
+ self.runId = runId
+ self.name = name
+ self.sparkSession = sparkSession
+ }
+
+ @discardableResult
+ private func executeCommand(
+ _ command: StreamingQueryCommand.OneOf_Command
+ ) async throws -> [ExecutePlanResponse] {
+ return try await self.sparkSession.client.executeStreamingQueryCommand(
+ self.id.uuidString,
+ self.runId.uuidString,
+ command
+ )
+ }
+
+ /// Returns `true` if this query is actively running.
+ public var isActive: Bool {
+ get async throws {
+ let response = try await
executeCommand(StreamingQueryCommand.OneOf_Command.status(true))
+ return response.first!.streamingQueryCommandResult.status.isActive
+ }
+ }
+
+ /// Returns the ``StreamingQueryException`` if the query was terminated by
an exception.
+ /// - Returns: A ``StreamingQueryException``.
+ public func exception() async throws -> StreamingQueryException? {
+ let response = try await
executeCommand(StreamingQueryCommand.OneOf_Command.exception(true))
+ let result = response.first!.streamingQueryCommandResult.exception
+ return StreamingQueryException(
+ exceptionMessage: result.exceptionMessage,
+ errorClass: result.errorClass,
+ stackTrace: result.stackTrace,
+ )
+ }
+
+ /// Returns the current status of the query.
+ /// - Returns:
+ public func status() async throws -> StreamingQueryStatus {
+ let response = try await
executeCommand(StreamingQueryCommand.OneOf_Command.status(true))
+ let result = response.first!.streamingQueryCommandResult.status
+ return StreamingQueryStatus(
+ statusMessage: result.statusMessage,
+ isDataAvailable: result.isDataAvailable,
+ isTriggerActive: result.isTriggerActive,
+ isActive: result.isActive,
+ )
+ }
+
+ /// Returns an array of the most recent ``StreamingQueryProgress`` updates
for this query.
+ /// The number of progress updates retained for each stream is configured by
Spark session
+ /// configuration `spark.sql.streaming.numRecentProgressUpdates`.
+ public var recentProgress: [String] {
+ get async throws {
+ let response = try await executeCommand(
+ StreamingQueryCommand.OneOf_Command.recentProgress(true))
+ let result = response.first!.streamingQueryCommandResult.recentProgress
+ return result.recentProgressJson
+ }
+ }
+
+ /// Returns the most recent ``StreamingQueryProgress`` update of this
streaming query.
+ public var lastProgress: String? {
+ get async throws {
+ let response = try await executeCommand(
+ StreamingQueryCommand.OneOf_Command.lastProgress(true))
+ let result = response.first!.streamingQueryCommandResult.recentProgress
+ return result.recentProgressJson.first
+ }
+ }
+
+ /// Waits for the termination of `this` query, either by `query.stop()` or
by an exception.
+ /// If the query has terminated with an exception, then the exception will
be thrown.
+ ///
+ /// If the query has terminated, then all subsequent calls to this method
will either return
+ /// immediately (if the query was terminated by `stop()`), or throw the
exception immediately
+ /// (if the query has terminated with exception).
+ /// - Parameter timeout: A timeout in milliseconds.
+ /// - Returns: True on termination.
+ public func awaitTermination(_ timeoutMs: Int64? = nil) async throws ->
Bool? {
+ var command = Spark_Connect_StreamingQueryCommand.AwaitTerminationCommand()
+ if let timeoutMs {
+ command.timeoutMs = timeoutMs
+ }
+ let response = try await executeCommand(
+ StreamingQueryCommand.OneOf_Command.awaitTermination(command))
+ return
response.first!.streamingQueryCommandResult.awaitTermination.terminated
+ }
+
+ /// Blocks until all available data in the source has been processed and
committed to the sink.
+ ///
+ /// This method is intended for testing. Note that in the case of
continually arriving data, this
+ /// method may block forever. Additionally, this method is only guaranteed
to block until data
+ /// that has been synchronously appended data to a
+ /// `org.apache.spark.sql.execution.streaming.Source` prior to invocation.
+ /// (i.e. `getOffset` must immediately reflect the addition).
+ public func processAllAvailable() async throws {
+ try await
executeCommand(StreamingQueryCommand.OneOf_Command.processAllAvailable(true))
+ }
+
+ /// Stops the execution of this query if it is running. This waits until the
termination of the
+ /// query execution threads or until a timeout is hit.
+ ///
+ /// By default stop will block indefinitely. You can configure a timeout by
the configuration
+ /// `spark.sql.streaming.stopTimeout`. A timeout of 0 (or negative)
milliseconds will block
+ /// indefinitely. If a `TimeoutException` is thrown, users can retry
stopping the stream. If the
+ /// issue persists, it is advisable to kill the Spark application.
+ public func stop() async throws {
+ try await executeCommand(StreamingQueryCommand.OneOf_Command.stop(true))
+ }
+
+ /// Prints the physical plan to the console for debugging purposes.
+ /// - Parameter extended: Whether to do extended explain or not.
+ public func explain(_ extended: Bool = false) async throws {
+ var command = Spark_Connect_StreamingQueryCommand.ExplainCommand()
+ command.extended = extended
+ let response = try await
executeCommand(StreamingQueryCommand.OneOf_Command.explain(command))
+ print(response.first!.streamingQueryCommandResult.explain.result)
+ }
+}
diff --git a/Sources/SparkConnect/TypeAliases.swift
b/Sources/SparkConnect/TypeAliases.swift
index 41547f8..cb7864b 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -55,6 +55,7 @@ typealias SetOpType = SetOperation.SetOpType
typealias ShowString = Spark_Connect_ShowString
typealias SparkConnectService = Spark_Connect_SparkConnectService
typealias Sort = Spark_Connect_Sort
+typealias StreamingQueryCommand = Spark_Connect_StreamingQueryCommand
typealias StructType = Spark_Connect_DataType.Struct
typealias Tail = Spark_Connect_Tail
typealias UserContext = Spark_Connect_UserContext
diff --git a/Tests/SparkConnectTests/StreamingQueryTests.swift
b/Tests/SparkConnectTests/StreamingQueryTests.swift
new file mode 100644
index 0000000..209a435
--- /dev/null
+++ b/Tests/SparkConnectTests/StreamingQueryTests.swift
@@ -0,0 +1,57 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import Foundation
+import Testing
+
+@testable import SparkConnect
+
+/// A test suite for `StreamingQuery`
+@Suite(.serialized)
+struct StreamingQueryTests {
+
+ @Test
+ func create() async throws {
+ let spark = try await SparkSession.builder.getOrCreate()
+ let id = UUID()
+ let runId = UUID()
+ let query = StreamingQuery(id, runId, "name", spark)
+ #expect(await query.id == id)
+ #expect(await query.runId == runId)
+ #expect(await query.name == "name")
+
+ // Streaming query xxx is not found
+ try await #require(throws: Error.self) {
+ try await query.isActive
+ }
+ try await #require(throws: Error.self) {
+ try await query.recentProgress
+ }
+ try await #require(throws: Error.self) {
+ try await query.lastProgress
+ }
+ try await #require(throws: Error.self) {
+ try await query.awaitTermination()
+ }
+ try await #require(throws: Error.self) {
+ try await query.awaitTermination(1000)
+ }
+ await spark.stop()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]