This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c3c00f  [SPARK-52068] Add `StreamingQuery` actor
5c3c00f is described below

commit 5c3c00fa8a4bd23f13b6d4a535f01e5d8040270c
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat May 10 17:20:29 2025 -0700

    [SPARK-52068] Add `StreamingQuery` actor
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `StreamingQuery` actor.
    
    ### Why are the changes needed?
    
    To implement `StreamingQueryManager`, we need this first.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #125 from dongjoon-hyun/SPARK-52068.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Sources/SparkConnect/SparkConnectClient.swift     |  16 ++
 Sources/SparkConnect/StreamingQuery.swift         | 176 ++++++++++++++++++++++
 Sources/SparkConnect/TypeAliases.swift            |   1 +
 Tests/SparkConnectTests/StreamingQueryTests.swift |  57 +++++++
 4 files changed, 250 insertions(+)

diff --git a/Sources/SparkConnect/SparkConnectClient.swift 
b/Sources/SparkConnect/SparkConnectClient.swift
index d74528b..de019b8 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -981,6 +981,22 @@ public actor SparkConnectClient {
     try await execute(self.sessionID!, command)
   }
 
+  func executeStreamingQueryCommand(
+    _ id: String,
+    _ runID: String,
+    _ command: StreamingQueryCommand.OneOf_Command,
+  ) async throws -> [ExecutePlanResponse] {
+    var queryID = Spark_Connect_StreamingQueryInstanceId()
+    queryID.id = id
+    queryID.runID = runID
+    var streamingQueryCommand = Spark_Connect_StreamingQueryCommand()
+    streamingQueryCommand.queryID = queryID
+    streamingQueryCommand.command = command
+    var command = Spark_Connect_Command()
+    command.streamingQueryCommand = streamingQueryCommand
+    return try await execute(self.sessionID!, command)
+  }
+
   private enum URIParams {
     static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size"
     static let PARAM_SESSION_ID = "session_id"
diff --git a/Sources/SparkConnect/StreamingQuery.swift 
b/Sources/SparkConnect/StreamingQuery.swift
new file mode 100644
index 0000000..6d62f2e
--- /dev/null
+++ b/Sources/SparkConnect/StreamingQuery.swift
@@ -0,0 +1,176 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+import Foundation
+
+public struct StreamingQueryException: Sendable {
+  let exceptionMessage: String
+  let errorClass: String
+  let stackTrace: String
+}
+
+public struct StreamingQueryStatus: Sendable {
+  let statusMessage: String
+  let isDataAvailable: Bool
+  let isTriggerActive: Bool
+  let isActive: Bool
+}
+
+/// A handle to a query that is executing continuously in the background as 
new data arrives.
+public actor StreamingQuery: Sendable {
+  /// Returns the unique id of this query that persists across restarts from 
checkpoint data. That
+  /// is, this id is generated when a query is started for the first time, and 
will be the same
+  /// every time it is restarted from checkpoint data. Also see ``runId``.
+  public let id: UUID
+
+  /// Returns the unique id of this run of the query. That is, every 
start/restart of a query will
+  /// generate a unique runId. Therefore, every time a query is restarted from 
checkpoint, it will
+  /// have the same ``id`` but different ``runId``s.
+  public let runId: UUID
+
+  /// Returns the user-specified name of the query, or null if not specified. 
This name can be
+  /// specified in the `org.apache.spark.sql.streaming.DataStreamWriter` as
+  /// `dataframe.writeStream.queryName("query").start()`. This name, if set, 
must be unique across
+  /// all active queries.
+  public let name: String
+
+  /// Returns the `SparkSession` associated with `this`.
+  public let sparkSession: SparkSession
+
+  init(_ id: UUID, _ runId: UUID, _ name: String, _ sparkSession: 
SparkSession) {
+    self.id = id
+    self.runId = runId
+    self.name = name
+    self.sparkSession = sparkSession
+  }
+
+  @discardableResult
+  private func executeCommand(
+    _ command: StreamingQueryCommand.OneOf_Command
+  ) async throws -> [ExecutePlanResponse] {
+    return try await self.sparkSession.client.executeStreamingQueryCommand(
+      self.id.uuidString,
+      self.runId.uuidString,
+      command
+    )
+  }
+
+  /// Returns `true` if this query is actively running.
+  public var isActive: Bool {
+    get async throws {
+      let response = try await 
executeCommand(StreamingQueryCommand.OneOf_Command.status(true))
+      return response.first!.streamingQueryCommandResult.status.isActive
+    }
+  }
+
+  /// Returns the ``StreamingQueryException`` if the query was terminated by 
an exception.
+  /// - Returns: A ``StreamingQueryException``.
+  public func exception() async throws -> StreamingQueryException? {
+    let response = try await 
executeCommand(StreamingQueryCommand.OneOf_Command.exception(true))
+    let result = response.first!.streamingQueryCommandResult.exception
+    return StreamingQueryException(
+      exceptionMessage: result.exceptionMessage,
+      errorClass: result.errorClass,
+      stackTrace: result.stackTrace,
+    )
+  }
+
+  /// Returns the current status of the query.
+  /// - Returns:
+  public func status() async throws -> StreamingQueryStatus {
+    let response = try await 
executeCommand(StreamingQueryCommand.OneOf_Command.status(true))
+    let result = response.first!.streamingQueryCommandResult.status
+    return StreamingQueryStatus(
+      statusMessage: result.statusMessage,
+      isDataAvailable: result.isDataAvailable,
+      isTriggerActive: result.isTriggerActive,
+      isActive: result.isActive,
+    )
+  }
+
+  /// Returns an array of the most recent ``StreamingQueryProgress`` updates 
for this query.
+  /// The number of progress updates retained for each stream is configured by 
Spark session
+  /// configuration `spark.sql.streaming.numRecentProgressUpdates`.
+  public var recentProgress: [String] {
+    get async throws {
+      let response = try await executeCommand(
+        StreamingQueryCommand.OneOf_Command.recentProgress(true))
+      let result = response.first!.streamingQueryCommandResult.recentProgress
+      return result.recentProgressJson
+    }
+  }
+
+  /// Returns the most recent ``StreamingQueryProgress`` update of this 
streaming query.
+  public var lastProgress: String? {
+    get async throws {
+      let response = try await executeCommand(
+        StreamingQueryCommand.OneOf_Command.lastProgress(true))
+      let result = response.first!.streamingQueryCommandResult.recentProgress
+      return result.recentProgressJson.first
+    }
+  }
+
+  /// Waits for the termination of `this` query, either by `query.stop()` or 
by an exception.
+  /// If the query has terminated with an exception, then the exception will 
be thrown.
+  ///
+  /// If the query has terminated, then all subsequent calls to this method 
will either return
+  /// immediately (if the query was terminated by `stop()`), or throw the 
exception immediately
+  /// (if the query has terminated with exception).
+  /// - Parameter timeout: A timeout in milliseconds.
+  /// - Returns: True on termination.
+  public func awaitTermination(_ timeoutMs: Int64? = nil) async throws -> 
Bool? {
+    var command = Spark_Connect_StreamingQueryCommand.AwaitTerminationCommand()
+    if let timeoutMs {
+      command.timeoutMs = timeoutMs
+    }
+    let response = try await executeCommand(
+      StreamingQueryCommand.OneOf_Command.awaitTermination(command))
+    return 
response.first!.streamingQueryCommandResult.awaitTermination.terminated
+  }
+
+  /// Blocks until all available data in the source has been processed and 
committed to the sink.
+  ///
+  /// This method is intended for testing. Note that in the case of 
continually arriving data, this
+  /// method may block forever. Additionally, this method is only guaranteed 
to block until data
+  /// that has been synchronously appended data to a
+  /// `org.apache.spark.sql.execution.streaming.Source` prior to invocation.
+  /// (i.e. `getOffset` must immediately reflect the addition).
+  public func processAllAvailable() async throws {
+    try await 
executeCommand(StreamingQueryCommand.OneOf_Command.processAllAvailable(true))
+  }
+
+  /// Stops the execution of this query if it is running. This waits until the 
termination of the
+  /// query execution threads or until a timeout is hit.
+  ///
+  /// By default stop will block indefinitely. You can configure a timeout by 
the configuration
+  /// `spark.sql.streaming.stopTimeout`. A timeout of 0 (or negative) 
milliseconds will block
+  /// indefinitely. If a `TimeoutException` is thrown, users can retry 
stopping the stream. If the
+  /// issue persists, it is advisable to kill the Spark application.
+  public func stop() async throws {
+    try await executeCommand(StreamingQueryCommand.OneOf_Command.stop(true))
+  }
+
+  /// Prints the physical plan to the console for debugging purposes.
+  /// - Parameter extended: Whether to do extended explain or not.
+  public func explain(_ extended: Bool = false) async throws {
+    var command = Spark_Connect_StreamingQueryCommand.ExplainCommand()
+    command.extended = extended
+    let response = try await 
executeCommand(StreamingQueryCommand.OneOf_Command.explain(command))
+    print(response.first!.streamingQueryCommandResult.explain.result)
+  }
+}
diff --git a/Sources/SparkConnect/TypeAliases.swift 
b/Sources/SparkConnect/TypeAliases.swift
index 41547f8..cb7864b 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -55,6 +55,7 @@ typealias SetOpType = SetOperation.SetOpType
 typealias ShowString = Spark_Connect_ShowString
 typealias SparkConnectService = Spark_Connect_SparkConnectService
 typealias Sort = Spark_Connect_Sort
+typealias StreamingQueryCommand = Spark_Connect_StreamingQueryCommand
 typealias StructType = Spark_Connect_DataType.Struct
 typealias Tail = Spark_Connect_Tail
 typealias UserContext = Spark_Connect_UserContext
diff --git a/Tests/SparkConnectTests/StreamingQueryTests.swift 
b/Tests/SparkConnectTests/StreamingQueryTests.swift
new file mode 100644
index 0000000..209a435
--- /dev/null
+++ b/Tests/SparkConnectTests/StreamingQueryTests.swift
@@ -0,0 +1,57 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import Foundation
+import Testing
+
+@testable import SparkConnect
+
+/// A test suite for `StreamingQuery`
+@Suite(.serialized)
+struct StreamingQueryTests {
+
+  @Test
+  func create() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let id = UUID()
+    let runId = UUID()
+    let query = StreamingQuery(id, runId, "name", spark)
+    #expect(await query.id == id)
+    #expect(await query.runId == runId)
+    #expect(await query.name == "name")
+
+    // Streaming query xxx is not found
+    try await #require(throws: Error.self) {
+      try await query.isActive
+    }
+    try await #require(throws: Error.self) {
+      try await query.recentProgress
+    }
+    try await #require(throws: Error.self) {
+      try await query.lastProgress
+    }
+    try await #require(throws: Error.self) {
+      try await query.awaitTermination()
+    }
+    try await #require(throws: Error.self) {
+      try await query.awaitTermination(1000)
+    }
+    await spark.stop()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to