This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push:
new c5f1ff7 [SPARK-52069] Support `DataStreamReader` and
`DataStreamWriter`
c5f1ff7 is described below
commit c5f1ff77e12b3b2626b0860b7e6335b1de93da09
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun May 11 11:43:05 2025 -0700
[SPARK-52069] Support `DataStreamReader` and `DataStreamWriter`
### What changes were proposed in this pull request?
This PR aims to support `DataStreamReader` and `DataStreamWriter`.
### Why are the changes needed?
For feature parity.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs with the newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #126 from dongjoon-hyun/SPARK-52069.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
Sources/SparkConnect/DataFrame.swift | 12 +-
Sources/SparkConnect/DataStreamReader.swift | 221 ++++++++++++++++++++++++++
Sources/SparkConnect/DataStreamWriter.swift | 209 ++++++++++++++++++++++++
Sources/SparkConnect/SparkSession.swift | 23 ++-
Sources/SparkConnect/StreamingQuery.swift | 4 +-
Sources/SparkConnect/TypeAliases.swift | 1 +
Tests/SparkConnectTests/DataStreamTests.swift | 69 ++++++++
7 files changed, 533 insertions(+), 6 deletions(-)
diff --git a/Sources/SparkConnect/DataFrame.swift
b/Sources/SparkConnect/DataFrame.swift
index f984017..1c7b08d 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -113,6 +113,8 @@ import Synchronization
/// - ``unpivot(_:_:_:_:)``
/// - ``melt(_:_:_:)``
/// - ``melt(_:_:_:_:)``
+/// - ``transpose()``
+/// - ``transpose(_:)``
///
/// ### Join Operations
/// - ``join(_:)``
@@ -166,6 +168,7 @@ import Synchronization
/// ### Write Operations
/// - ``write``
/// - ``writeTo(_:)``
+/// - ``writeStream``
///
/// ### Sampling
/// - ``sample(_:_:_:)``
@@ -1381,7 +1384,7 @@ public actor DataFrame: Sendable {
/// Returns a ``DataFrameWriter`` that can be used to write non-streaming
data.
public var write: DataFrameWriter {
get {
- return DataFrameWriter(df: self)
+ DataFrameWriter(df: self)
}
}
@@ -1391,4 +1394,11 @@ public actor DataFrame: Sendable {
public func writeTo(_ table: String) -> DataFrameWriterV2 {
return DataFrameWriterV2(table, self)
}
+
+ /// Returns a ``DataStreamWriter`` that can be used to write streaming data.
+ public var writeStream: DataStreamWriter {
+ get {
+ DataStreamWriter(df: self)
+ }
+ }
}
diff --git a/Sources/SparkConnect/DataStreamReader.swift
b/Sources/SparkConnect/DataStreamReader.swift
new file mode 100644
index 0000000..41763e7
--- /dev/null
+++ b/Sources/SparkConnect/DataStreamReader.swift
@@ -0,0 +1,221 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+import Foundation
+
+/// An actor to load a streaming `Dataset` from external storage systems
+/// (e.g. file systems, key-value stores, etc). Use `SparkSession.readStream`
to access this.
+public actor DataStreamReader: Sendable {
+ var source: String = ""
+
+ var paths: [String] = []
+
+ var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary([:])
+
+ var userSpecifiedSchemaDDL: String? = nil
+
+ let sparkSession: SparkSession
+
+ init(sparkSession: SparkSession) {
+ self.sparkSession = sparkSession
+ }
+
+ /// Specifies the input data source format.
+ /// - Parameter source: A string.
+ /// - Returns: A ``DataStreamReader``.
+ public func format(_ source: String) -> DataStreamReader {
+ self.source = source
+ return self
+ }
+
+ /// Specifies the input schema. Some data sources (e.g. JSON) can infer the
input schema
+ /// automatically from data. By specifying the schema here, the underlying
data source can skip
+ /// the schema inference step, and thus speed up data loading.
+ /// - Parameter schema: A DDL schema string.
+ /// - Returns: A `DataStreamReader`.
+ @discardableResult
+ public func schema(_ schema: String) async throws -> DataStreamReader {
+ // Validate by parsing.
+ do {
+ try await sparkSession.client.ddlParse(schema)
+ } catch {
+ throw SparkConnectError.InvalidTypeException
+ }
+ self.userSpecifiedSchemaDDL = schema
+ return self
+ }
+
+ /// Adds an input option for the underlying data source.
+ /// - Parameters:
+ /// - key: A key string.
+ /// - value: A value string.
+ /// - Returns: A `DataStreamReader`.
+ public func option(_ key: String, _ value: String) -> DataStreamReader {
+ self.extraOptions[key] = value
+ return self
+ }
+
+ /// Adds an input option for the underlying data source.
+ /// - Parameters:
+ /// - key: A key string.
+ /// - value: A `Bool` value.
+ /// - Returns: A `DataStreamReader`.
+ public func option(_ key: String, _ value: Bool) -> DataStreamReader {
+ self.extraOptions[key] = String(value)
+ return self
+ }
+
+ /// Adds an input option for the underlying data source.
+ /// - Parameters:
+ /// - key: A key string.
+ /// - value: A `Int64` value.
+ /// - Returns: A `DataStreamReader`.
+ public func option(_ key: String, _ value: Int64) -> DataStreamReader {
+ self.extraOptions[key] = String(value)
+ return self
+ }
+
+ /// Adds an input option for the underlying data source.
+ /// - Parameters:
+ /// - key: A key string.
+ /// - value: A `Double` value.
+ /// - Returns: A `DataStreamReader`.
+ public func option(_ key: String, _ value: Double) -> DataStreamReader {
+ self.extraOptions[key] = String(value)
+ return self
+ }
+
+ /// Adds input options for the underlying data source.
+ /// - Parameter options: A string-string dictionary.
+ /// - Returns: A `DataStreamReader`.
+ public func options(_ options: [String: String]) -> DataStreamReader {
+ for (key, value) in options {
+ self.extraOptions[key] = value
+ }
+ return self
+ }
+
+ /// Loads input data stream in as a `DataFrame`, for data streams that don't
require a path
+ /// (e.g. external key-value stores).
+ /// - Returns: A `DataFrame`.
+ public func load() -> DataFrame {
+ return load([])
+ }
+
+ /// Loads input data stream in as a `DataFrame`, for data streams that
require a path
+ /// (e.g. data backed by a local or distributed file system).
+ /// - Parameter path: A path string.
+ /// - Returns: A `DataFrame`.
+ public func load(_ path: String) -> DataFrame {
+ return load([path])
+ }
+
+ func load(_ paths: [String]) -> DataFrame {
+ self.paths = paths
+
+ var dataSource = DataSource()
+ dataSource.format = self.source
+ dataSource.paths = self.paths
+ dataSource.options = self.extraOptions.toStringDictionary()
+ if let userSpecifiedSchemaDDL = self.userSpecifiedSchemaDDL {
+ dataSource.schema = userSpecifiedSchemaDDL
+ }
+
+ var read = Read()
+ read.dataSource = dataSource
+ read.isStreaming = true
+
+ var relation = Relation()
+ relation.read = read
+
+ var plan = Plan()
+ plan.opType = .root(relation)
+
+ return DataFrame(spark: sparkSession, plan: plan)
+ }
+
+ /// Define a Streaming DataFrame on a Table. The DataSource corresponding to
the table should
+ /// support streaming mode.
+ /// - Parameter tableName: The name of the table.
+ /// - Returns: A ``DataFrame``.
+ public func table(_ tableName: String) -> DataFrame {
+ var namedTable = NamedTable()
+ namedTable.unparsedIdentifier = tableName
+ namedTable.options = self.extraOptions.toStringDictionary()
+
+ var read = Read()
+ read.namedTable = namedTable
+ read.isStreaming = true
+
+ var relation = Relation()
+ relation.read = read
+
+ var plan = Plan()
+ plan.opType = .root(relation)
+
+ return DataFrame(spark: sparkSession, plan: plan)
+ }
+
+ /// Loads a text file stream and returns the result as a `DataFrame`.
+ /// - Parameter path: A path string
+ /// - Returns: A `DataFrame`.
+ public func text(_ path: String) -> DataFrame {
+ self.source = "text"
+ return load(path)
+ }
+
+ /// Loads a CSV file stream and returns the result as a `DataFrame`.
+ /// - Parameter path: A path string
+ /// - Returns: A `DataFrame`.
+ public func csv(_ path: String) -> DataFrame {
+ self.source = "csv"
+ return load(path)
+ }
+
+ /// Loads a JSON file stream and returns the result as a `DataFrame`.
+ /// - Parameter path: A path string
+ /// - Returns: A `DataFrame`.
+ public func json(_ path: String) -> DataFrame {
+ self.source = "json"
+ return load(path)
+ }
+
+ /// Loads an XML file stream and returns the result as a `DataFrame`.
+ /// - Parameter path: A path string
+ /// - Returns: A `DataFrame`.
+ public func xml(_ path: String) -> DataFrame {
+ self.source = "xml"
+ return load(path)
+ }
+
+ /// Loads an ORC file stream and returns the result as a `DataFrame`.
+ /// - Parameter path: A path string
+ /// - Returns: A `DataFrame`.
+ public func orc(_ path: String) -> DataFrame {
+ self.source = "orc"
+ return load(path)
+ }
+
+ /// Loads a Parquet file stream and returns the result as a `DataFrame`.
+ /// - Parameter path: A path string
+ /// - Returns: A `DataFrame`.
+ public func parquet(_ path: String) -> DataFrame {
+ self.source = "parquet"
+ return load(path)
+ }
+}
diff --git a/Sources/SparkConnect/DataStreamWriter.swift
b/Sources/SparkConnect/DataStreamWriter.swift
new file mode 100644
index 0000000..0af04ff
--- /dev/null
+++ b/Sources/SparkConnect/DataStreamWriter.swift
@@ -0,0 +1,209 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+import Foundation
+
+public enum Trigger {
+ case OneTimeTrigger
+ case AvailableNowTrigger
+ case ProcessingTimeTrigger(intervalMs: Int64)
+ case ContinuousTrigger(intervalMs: Int64)
+}
+
+/// An actor used to write a streaming `DataFrame` to external storage systems
+/// (e.g. file systems, key-value stores, etc). Use `DataFrame.writeStream` to
access this.
+public actor DataStreamWriter: Sendable {
+ var queryName: String? = nil
+
+ var source: String? = nil
+
+ var trigger: Trigger? = nil
+
+ var path: String? = nil
+
+ var tableName: String? = nil
+
+ var outputMode: String? = nil
+
+ var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary()
+
+ var partitioningColumns: [String]? = nil
+
+ var clusteringColumns: [String]? = nil
+
+ let df: DataFrame
+
+ init(df: DataFrame) {
+ self.df = df
+ }
+
+ /// Specifies the name of the ``StreamingQuery`` that can be
+ /// started with `start()`. This name must be unique among all the currently
active queries in
+ /// the associated SparkSession.
+ /// - Parameter queryName: A string name.
+ /// - Returns: A ``DataStreamWriter``.
+ public func queryName(_ queryName: String) -> DataStreamWriter {
+ self.queryName = queryName
+ return self
+ }
+
+ /// Specifies the underlying output data source.
+ /// - Parameter source: A string.
+ /// - Returns: A `DataStreamWriter`.
+ public func format(_ source: String) -> DataStreamWriter {
+ self.source = source
+ return self
+ }
+
+ /// Specifies how data of a streaming ``DataFrame`` is written to a
streaming sink.
+ ///
+ /// - `append`: only the new rows in the streaming ``DataFrame`` will be
written to the sink.
+ /// - `complete`: all the rows in the streaming ``DataFrame`` will be
written to the sink
+ /// every time there are some updates.
+ /// - `update`: only the rows that were updated in the streaming
``DataFrame`` will be
+ /// written to the sink every time there are some updates. If the query
doesn't contain
+ /// aggregations, it will be equivalent to `append` mode.
+ ///
+ /// - Parameter outputMode: A string for outputMode.
+ /// - Returns: A ``DataStreamWriter``.
+ public func outputMode(_ outputMode: String) -> DataStreamWriter {
+ self.outputMode = outputMode
+ return self
+ }
+
+ /// Adds an output option for the underlying data source.
+ /// - Parameters:
+ /// - key: A key string.
+ /// - value: A value string.
+ /// - Returns: A `DataStreamWriter`.
+ public func option(_ key: String, _ value: String) -> DataStreamWriter {
+ self.extraOptions[key] = value
+ return self
+ }
+
+ /// Partitions the output by the given columns on the file system. If
specified, the output is
+ /// laid out on the file system similar to Hive's partitioning scheme.
+ /// - Parameter colNames: Column names to partition.
+ /// - Returns: A ``DataStreamWriter``.
+ public func partitionBy(_ colNames: String...) -> DataStreamWriter {
+ self.partitioningColumns = colNames
+ return self
+ }
+
+ /// Clusters the output by the given columns. If specified, the output is
laid out such that
+ /// records with similar values on the clustering column are grouped
together in the same file.
+ /// - Parameter colNames: Column names to cluster.
+ /// - Returns: A ``DataStreamWriter``.
+ public func clusterBy(_ colNames: String...) -> DataStreamWriter {
+ self.clusteringColumns = colNames
+ return self
+ }
+
+ /// Loads input in as a `DataFrame`, for data sources that don't require a
path (e.g. external
+ /// key-value stores).
+ public func trigger(_ trigger: Trigger) async throws -> DataStreamWriter {
+ self.trigger = trigger
+ return self
+ }
+
+ /// Starts the execution of the streaming query, which will continually
output results to the
+ /// given path as new data arrives. The returned ``StreamingQuery`` object
can be used to interact
+ /// with the stream.
+ /// - Parameter path: A path to write.
+ /// - Returns: A ``StreamingQuery``.
+ public func start(_ path: String) async throws -> StreamingQuery {
+ self.path = path
+ return try await start()
+ }
+
+ /// Starts the execution of the streaming query, which will continually
output results to the
+ /// given path as new data arrives. The returned ``StreamingQuery`` object
can be used to interact
+ /// with the stream. Throws exceptions if the following conditions are met:
+ /// - Another run of the same streaming query, that is a streaming query
sharing the same
+ /// checkpoint location, is already active on the same Spark Driver
+ /// - The SQL configuration `spark.sql.streaming.stopActiveRunOnRestart` is
enabled
+ /// - The active run cannot be stopped within the timeout controlled by the
SQL configuration `spark.sql.streaming.stopTimeout`
+ ///
+ /// - Returns: A ``StreamingQuery``.
+ public func start() async throws -> StreamingQuery {
+ var writeStreamOperationStart = WriteStreamOperationStart()
+ writeStreamOperationStart.input = (await df.getPlan() as! Plan).root
+ if let source = self.source {
+ writeStreamOperationStart.format = source
+ }
+ writeStreamOperationStart.options = self.extraOptions.toStringDictionary()
+ if let partitioningColumns = self.partitioningColumns {
+ writeStreamOperationStart.partitioningColumnNames = partitioningColumns
+ }
+ if let clusteringColumns = self.clusteringColumns {
+ writeStreamOperationStart.clusteringColumnNames = clusteringColumns
+ }
+ writeStreamOperationStart.trigger =
+ switch self.trigger {
+ case .ProcessingTimeTrigger(let intervalMs):
+ .processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND")
+ case .OneTimeTrigger:
+ .once(true)
+ case .AvailableNowTrigger:
+ .availableNow(true)
+ case .ContinuousTrigger(let intervalMs):
+ .continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND")
+ default: .once(true)
+ }
+ if let outputMode = self.outputMode {
+ writeStreamOperationStart.outputMode = outputMode
+ }
+ if let queryName = self.queryName {
+ writeStreamOperationStart.queryName = queryName
+ }
+ if let path = self.path {
+ writeStreamOperationStart.sinkDestination = .path(path)
+ }
+ if let tableName = self.tableName {
+ writeStreamOperationStart.sinkDestination = .tableName(tableName)
+ }
+
+ var command = Spark_Connect_Command()
+ command.writeStreamOperationStart = writeStreamOperationStart
+
+ let response = try await df.spark.client.execute(df.spark.sessionID,
command)
+ let result = response.first!.writeStreamOperationStartResult
+ if result.hasQueryStartedEventJson {
+ // TODO: post
+ }
+
+ let query = try await StreamingQuery(
+ UUID(uuidString: result.queryID.id)!,
+ UUID(uuidString: result.queryID.runID)!,
+ result.name,
+ self.df.sparkSession,
+ )
+
+ return query
+ }
+
+ /// Starts the execution of the streaming query, which will continually
output results to the
+ /// given table as new data arrives. The returned ``StreamingQuery`` object
can be used to interact
+ /// with the stream.
+ /// - Parameter tableName: A table name.
+ /// - Returns: A ``StreamingQuery``.
+ public func toTable(tableName: String) async throws -> StreamingQuery {
+ self.tableName = tableName
+ return try await start()
+ }
+}
diff --git a/Sources/SparkConnect/SparkSession.swift
b/Sources/SparkConnect/SparkSession.swift
index e588ace..392f387 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -194,7 +194,7 @@ public actor SparkSession {
/// Returns a DataFrameReader for reading data in various formats.
///
/// The DataFrameReader provides methods to load data from external storage
systems
- /// such as file systems, databases, and streaming sources.
+ /// such as file systems and databases.
///
/// ```swift
/// // Read a CSV file
@@ -208,14 +208,31 @@ public actor SparkSession {
/// .json("path/to/file.json")
///
/// // Read an ORC file
- /// let parquetData = spark.read
+ /// let orcData = spark.read
/// .orc("path/to/file.orc")
/// ```
///
/// - Returns: A DataFrameReader instance configured for this session
public var read: DataFrameReader {
get {
- return DataFrameReader(sparkSession: self)
+ DataFrameReader(sparkSession: self)
+ }
+ }
+
+ /// Returns a `DataStreamReader` that can be used to read streaming data in
as a `DataFrame`.
+ ///
+ /// The DataFrameReader provides methods to load data from external storage
systems
+ /// such as file systems, databases, and streaming sources.
+ ///
+ /// ```swift
+ /// // Read an ORC file
+ /// let orcData = spark.readStream.orc("path/to/file.orc")
+ /// ```
+ ///
+ /// - Returns: A DataFrameReader instance configured for this session
+ public var readStream: DataStreamReader {
+ get {
+ DataStreamReader(sparkSession: self)
}
}
diff --git a/Sources/SparkConnect/StreamingQuery.swift
b/Sources/SparkConnect/StreamingQuery.swift
index 2758a5e..70f81e1 100644
--- a/Sources/SparkConnect/StreamingQuery.swift
+++ b/Sources/SparkConnect/StreamingQuery.swift
@@ -64,8 +64,8 @@ public actor StreamingQuery: Sendable {
_ command: StreamingQueryCommand.OneOf_Command
) async throws -> [ExecutePlanResponse] {
return try await self.sparkSession.client.executeStreamingQueryCommand(
- self.id.uuidString,
- self.runId.uuidString,
+ self.id.uuidString.lowercased(),
+ self.runId.uuidString.lowercased(),
command
)
}
diff --git a/Sources/SparkConnect/TypeAliases.swift
b/Sources/SparkConnect/TypeAliases.swift
index cb7864b..e0543b6 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -63,4 +63,5 @@ typealias UnresolvedAttribute =
Spark_Connect_Expression.UnresolvedAttribute
typealias WithColumnsRenamed = Spark_Connect_WithColumnsRenamed
typealias WriteOperation = Spark_Connect_WriteOperation
typealias WriteOperationV2 = Spark_Connect_WriteOperationV2
+typealias WriteStreamOperationStart = Spark_Connect_WriteStreamOperationStart
typealias YearMonthInterval = Spark_Connect_DataType.YearMonthInterval
diff --git a/Tests/SparkConnectTests/DataStreamTests.swift
b/Tests/SparkConnectTests/DataStreamTests.swift
new file mode 100644
index 0000000..00af6ae
--- /dev/null
+++ b/Tests/SparkConnectTests/DataStreamTests.swift
@@ -0,0 +1,69 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import Foundation
+import SparkConnect
+import Testing
+
+/// A test suite for `DataStreamReader` and `DataStreamWriter`
+@Suite(.serialized)
+struct DataStreamTests {
+ @Test
+ func query() async throws {
+ let spark = try await SparkSession.builder.getOrCreate()
+
+ // Prepare directories
+ let input = "/tmp/input-" + UUID().uuidString
+ let checkpoint = "/tmp/checkpoint-" + UUID().uuidString
+ let output = "/tmp/output-" + UUID().uuidString
+ try await spark.range(2025).write.orc(input)
+
+ // Create a streaming dataframe.
+ let df =
+ try await spark
+ .readStream
+ .schema("id LONG")
+ .orc(input)
+ #expect(try await df.isStreaming())
+
+ // Processing
+ let df2 = await df.selectExpr("id", "id * 10 as value")
+
+ // Start a streaming query
+ let query =
+ try await df2
+ .writeStream
+ .option("checkpointLocation", checkpoint)
+ .outputMode("append")
+ .format("orc")
+ .trigger(Trigger.ProcessingTimeTrigger(intervalMs: 1000))
+ .start(output)
+ #expect(try await query.isActive)
+ // Wait for processing
+ try await Task.sleep(nanoseconds: 2_000_000_000)
+
+ try await query.stop()
+ #expect(try await query.isActive == false)
+
+ let df3 = await spark.read.orc(output)
+ #expect(try await df3.dtypes.count == 2)
+ #expect(try await df3.count() == 2025)
+ await spark.stop()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]