This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new c5f1ff7  [SPARK-52069] Support `DataStreamReader` and 
`DataStreamWriter`
c5f1ff7 is described below

commit c5f1ff77e12b3b2626b0860b7e6335b1de93da09
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun May 11 11:43:05 2025 -0700

    [SPARK-52069] Support `DataStreamReader` and `DataStreamWriter`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `DataStreamReader` and `DataStreamWriter`.
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #126 from dongjoon-hyun/SPARK-52069.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Sources/SparkConnect/DataFrame.swift          |  12 +-
 Sources/SparkConnect/DataStreamReader.swift   | 221 ++++++++++++++++++++++++++
 Sources/SparkConnect/DataStreamWriter.swift   | 209 ++++++++++++++++++++++++
 Sources/SparkConnect/SparkSession.swift       |  23 ++-
 Sources/SparkConnect/StreamingQuery.swift     |   4 +-
 Sources/SparkConnect/TypeAliases.swift        |   1 +
 Tests/SparkConnectTests/DataStreamTests.swift |  69 ++++++++
 7 files changed, 533 insertions(+), 6 deletions(-)

diff --git a/Sources/SparkConnect/DataFrame.swift 
b/Sources/SparkConnect/DataFrame.swift
index f984017..1c7b08d 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -113,6 +113,8 @@ import Synchronization
 /// - ``unpivot(_:_:_:_:)``
 /// - ``melt(_:_:_:)``
 /// - ``melt(_:_:_:_:)``
+/// - ``transpose()``
+/// - ``transpose(_:)``
 ///
 /// ### Join Operations
 /// - ``join(_:)``
@@ -166,6 +168,7 @@ import Synchronization
 /// ### Write Operations
 /// - ``write``
 /// - ``writeTo(_:)``
+/// - ``writeStream``
 ///
 /// ### Sampling
 /// - ``sample(_:_:_:)``
@@ -1381,7 +1384,7 @@ public actor DataFrame: Sendable {
   /// Returns a ``DataFrameWriter`` that can be used to write non-streaming 
data.
   public var write: DataFrameWriter {
     get {
-      return DataFrameWriter(df: self)
+      DataFrameWriter(df: self)
     }
   }
 
@@ -1391,4 +1394,11 @@ public actor DataFrame: Sendable {
   public func writeTo(_ table: String) -> DataFrameWriterV2 {
     return DataFrameWriterV2(table, self)
   }
+  
+  /// Returns a ``DataStreamWriter`` that can be used to write streaming data.
+  public var writeStream: DataStreamWriter {
+    get {
+      DataStreamWriter(df: self)
+    }
+  }
 }
diff --git a/Sources/SparkConnect/DataStreamReader.swift 
b/Sources/SparkConnect/DataStreamReader.swift
new file mode 100644
index 0000000..41763e7
--- /dev/null
+++ b/Sources/SparkConnect/DataStreamReader.swift
@@ -0,0 +1,221 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+import Foundation
+
+/// An actor to load a streaming `Dataset` from external storage systems
+/// (e.g. file systems, key-value stores, etc). Use `SparkSession.readStream` 
to access this.
+public actor DataStreamReader: Sendable {
+  var source: String = ""
+
+  var paths: [String] = []
+
+  var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary([:])
+
+  var userSpecifiedSchemaDDL: String? = nil
+
+  let sparkSession: SparkSession
+
+  init(sparkSession: SparkSession) {
+    self.sparkSession = sparkSession
+  }
+
+  /// Specifies the input data source format.
+  /// - Parameter source: A string.
+  /// - Returns: A ``DataStreamReader``.
+  public func format(_ source: String) -> DataStreamReader {
+    self.source = source
+    return self
+  }
+
+  /// Specifies the input schema. Some data sources (e.g. JSON) can infer the 
input schema
+  /// automatically from data. By specifying the schema here, the underlying 
data source can skip
+  /// the schema inference step, and thus speed up data loading.
+  /// - Parameter schema: A DDL schema string.
+  /// - Returns: A `DataStreamReader`.
+  @discardableResult
+  public func schema(_ schema: String) async throws -> DataStreamReader {
+    // Validate by parsing.
+    do {
+      try await sparkSession.client.ddlParse(schema)
+    } catch {
+      throw SparkConnectError.InvalidTypeException
+    }
+    self.userSpecifiedSchemaDDL = schema
+    return self
+  }
+
+  /// Adds an input option for the underlying data source.
+  /// - Parameters:
+  ///   - key: A key string.
+  ///   - value: A value string.
+  /// - Returns: A `DataStreamReader`.
+  public func option(_ key: String, _ value: String) -> DataStreamReader {
+    self.extraOptions[key] = value
+    return self
+  }
+
+  /// Adds an input option for the underlying data source.
+  /// - Parameters:
+  ///   - key: A key string.
+  ///   - value: A `Bool` value.
+  /// - Returns: A `DataStreamReader`.
+  public func option(_ key: String, _ value: Bool) -> DataStreamReader {
+    self.extraOptions[key] = String(value)
+    return self
+  }
+
+  /// Adds an input option for the underlying data source.
+  /// - Parameters:
+  ///   - key: A key string.
+  ///   - value: A `Int64` value.
+  /// - Returns: A `DataStreamReader`.
+  public func option(_ key: String, _ value: Int64) -> DataStreamReader {
+    self.extraOptions[key] = String(value)
+    return self
+  }
+
+  /// Adds an input option for the underlying data source.
+  /// - Parameters:
+  ///   - key: A key string.
+  ///   - value: A `Double` value.
+  /// - Returns: A `DataStreamReader`.
+  public func option(_ key: String, _ value: Double) -> DataStreamReader {
+    self.extraOptions[key] = String(value)
+    return self
+  }
+
+  /// Adds input options for the underlying data source.
+  /// - Parameter options: A string-string dictionary.
+  /// - Returns: A `DataStreamReader`.
+  public func options(_ options: [String: String]) -> DataStreamReader {
+    for (key, value) in options {
+      self.extraOptions[key] = value
+    }
+    return self
+  }
+
+  /// Loads input data stream in as a `DataFrame`, for data streams that don't 
require a path
+  /// (e.g. external key-value stores).
+  /// - Returns: A `DataFrame`.
+  public func load() -> DataFrame {
+    return load([])
+  }
+
+  /// Loads input data stream in as a `DataFrame`, for data streams that 
require a path
+  /// (e.g. data backed by a local or distributed file system).
+  /// - Parameter path: A path string.
+  /// - Returns: A `DataFrame`.
+  public func load(_ path: String) -> DataFrame {
+    return load([path])
+  }
+
+  func load(_ paths: [String]) -> DataFrame {
+    self.paths = paths
+
+    var dataSource = DataSource()
+    dataSource.format = self.source
+    dataSource.paths = self.paths
+    dataSource.options = self.extraOptions.toStringDictionary()
+    if let userSpecifiedSchemaDDL = self.userSpecifiedSchemaDDL {
+      dataSource.schema = userSpecifiedSchemaDDL
+    }
+
+    var read = Read()
+    read.dataSource = dataSource
+    read.isStreaming = true
+
+    var relation = Relation()
+    relation.read = read
+
+    var plan = Plan()
+    plan.opType = .root(relation)
+
+    return DataFrame(spark: sparkSession, plan: plan)
+  }
+
+  /// Define a Streaming DataFrame on a Table. The DataSource corresponding to 
the table should
+  /// support streaming mode.
+  /// - Parameter tableName: The name of the table.
+  /// - Returns: A ``DataFrame``.
+  public func table(_ tableName: String) -> DataFrame {
+    var namedTable = NamedTable()
+    namedTable.unparsedIdentifier = tableName
+    namedTable.options = self.extraOptions.toStringDictionary()
+
+    var read = Read()
+    read.namedTable = namedTable
+    read.isStreaming = true
+
+    var relation = Relation()
+    relation.read = read
+
+    var plan = Plan()
+    plan.opType = .root(relation)
+
+    return DataFrame(spark: sparkSession, plan: plan)
+  }
+
+  /// Loads a text file stream and returns the result as a `DataFrame`.
+  /// - Parameter path: A path string
+  /// - Returns: A `DataFrame`.
+  public func text(_ path: String) -> DataFrame {
+    self.source = "text"
+    return load(path)
+  }
+
+  /// Loads a CSV file stream and returns the result as a `DataFrame`.
+  /// - Parameter path: A path string
+  /// - Returns: A `DataFrame`.
+  public func csv(_ path: String) -> DataFrame {
+    self.source = "csv"
+    return load(path)
+  }
+
+  /// Loads a JSON file stream and returns the result as a `DataFrame`.
+  /// - Parameter path: A path string
+  /// - Returns: A `DataFrame`.
+  public func json(_ path: String) -> DataFrame {
+    self.source = "json"
+    return load(path)
+  }
+
+  /// Loads an XML file stream and returns the result as a `DataFrame`.
+  /// - Parameter path: A path string
+  /// - Returns: A `DataFrame`.
+  public func xml(_ path: String) -> DataFrame {
+    self.source = "xml"
+    return load(path)
+  }
+
+  /// Loads an ORC file stream and returns the result as a `DataFrame`.
+  /// - Parameter path: A path string
+  /// - Returns: A `DataFrame`.
+  public func orc(_ path: String) -> DataFrame {
+    self.source = "orc"
+    return load(path)
+  }
+
+  /// Loads a Parquet file stream and returns the result as a `DataFrame`.
+  /// - Parameter path: A path string
+  /// - Returns: A `DataFrame`.
+  public func parquet(_ path: String) -> DataFrame {
+    self.source = "parquet"
+    return load(path)
+  }
+}
diff --git a/Sources/SparkConnect/DataStreamWriter.swift 
b/Sources/SparkConnect/DataStreamWriter.swift
new file mode 100644
index 0000000..0af04ff
--- /dev/null
+++ b/Sources/SparkConnect/DataStreamWriter.swift
@@ -0,0 +1,209 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+import Foundation
+
+public enum Trigger {
+  case OneTimeTrigger
+  case AvailableNowTrigger
+  case ProcessingTimeTrigger(intervalMs: Int64)
+  case ContinuousTrigger(intervalMs: Int64)
+}
+
+/// An actor used to write a streaming `DataFrame` to external storage systems
+/// (e.g. file systems, key-value stores, etc). Use `DataFrame.writeStream` to 
access this.
+public actor DataStreamWriter: Sendable {
+  var queryName: String? = nil
+
+  var source: String? = nil
+
+  var trigger: Trigger? = nil
+
+  var path: String? = nil
+
+  var tableName: String? = nil
+
+  var outputMode: String? = nil
+
+  var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary()
+
+  var partitioningColumns: [String]? = nil
+
+  var clusteringColumns: [String]? = nil
+
+  let df: DataFrame
+
+  init(df: DataFrame) {
+    self.df = df
+  }
+
+  /// Specifies the name of the ``StreamingQuery`` that can be
+  /// started with `start()`. This name must be unique among all the currently 
active queries in
+  /// the associated SparkSession.
+  /// - Parameter queryName: A string name.
+  /// - Returns: A ``DataStreamWriter``.
+  public func queryName(_ queryName: String) -> DataStreamWriter {
+    self.queryName = queryName
+    return self
+  }
+
+  /// Specifies the underlying output data source.
+  /// - Parameter source: A string.
+  /// - Returns: A `DataStreamWriter`.
+  public func format(_ source: String) -> DataStreamWriter {
+    self.source = source
+    return self
+  }
+
+  /// Specifies how data of a streaming ``DataFrame`` is written to a 
streaming sink.
+  ///
+  /// - `append`: only the new rows in the streaming ``DataFrame`` will be 
written to the sink.
+  /// - `complete`: all the rows in the streaming ``DataFrame`` will be 
written to the sink
+  /// every time there are some updates.
+  /// - `update`: only the rows that were updated in the streaming 
``DataFrame`` will be
+  /// written to the sink every time there are some updates. If the query 
doesn't contain
+  /// aggregations, it will be equivalent to `append` mode.
+  ///
+  /// - Parameter outputMode: A string for outputMode.
+  /// - Returns: A ``DataStreamWriter``.
+  public func outputMode(_ outputMode: String) -> DataStreamWriter {
+    self.outputMode = outputMode
+    return self
+  }
+
+  /// Adds an output option for the underlying data source.
+  /// - Parameters:
+  ///   - key: A key string.
+  ///   - value: A value string.
+  /// - Returns: A `DataStreamWriter`.
+  public func option(_ key: String, _ value: String) -> DataStreamWriter {
+    self.extraOptions[key] = value
+    return self
+  }
+
+  /// Partitions the output by the given columns on the file system. If 
specified, the output is
+  /// laid out on the file system similar to Hive's partitioning scheme.
+  /// - Parameter colNames: Column names to partition.
+  /// - Returns: A ``DataStreamWriter``.
+  public func partitionBy(_ colNames: String...) -> DataStreamWriter {
+    self.partitioningColumns = colNames
+    return self
+  }
+
+  /// Clusters the output by the given columns. If specified, the output is 
laid out such that
+  /// records with similar values on the clustering column are grouped 
together in the same file.
+  /// - Parameter colNames: Column names to cluster.
+  /// - Returns: A ``DataStreamWriter``.
+  public func clusterBy(_ colNames: String...) -> DataStreamWriter {
+    self.clusteringColumns = colNames
+    return self
+  }
+
+  /// Loads input in as a `DataFrame`, for data sources that don't require a 
path (e.g. external
+  /// key-value stores).
+  public func trigger(_ trigger: Trigger) async throws -> DataStreamWriter {
+    self.trigger = trigger
+    return self
+  }
+
+  /// Starts the execution of the streaming query, which will continually 
output results to the
+  /// given path as new data arrives. The returned ``StreamingQuery`` object 
can be used to interact
+  /// with the stream.
+  /// - Parameter path: A path to write.
+  /// - Returns: A ``StreamingQuery``.
+  public func start(_ path: String) async throws -> StreamingQuery {
+    self.path = path
+    return try await start()
+  }
+
+  /// Starts the execution of the streaming query, which will continually 
output results to the
+  /// given path as new data arrives. The returned ``StreamingQuery`` object 
can be used to interact
+  /// with the stream. Throws exceptions if the following conditions are met:
+  /// - Another run of the same streaming query, that is a streaming query 
sharing the same
+  ///   checkpoint location, is already active on the same Spark Driver
+  /// - The SQL configuration `spark.sql.streaming.stopActiveRunOnRestart` is 
enabled
+  /// - The active run cannot be stopped within the timeout controlled by the 
SQL configuration `spark.sql.streaming.stopTimeout`
+  ///
+  /// - Returns: A ``StreamingQuery``.
+  public func start() async throws -> StreamingQuery {
+    var writeStreamOperationStart = WriteStreamOperationStart()
+    writeStreamOperationStart.input = (await df.getPlan() as! Plan).root
+    if let source = self.source {
+      writeStreamOperationStart.format = source
+    }
+    writeStreamOperationStart.options = self.extraOptions.toStringDictionary()
+    if let partitioningColumns = self.partitioningColumns {
+      writeStreamOperationStart.partitioningColumnNames = partitioningColumns
+    }
+    if let clusteringColumns = self.clusteringColumns {
+      writeStreamOperationStart.clusteringColumnNames = clusteringColumns
+    }
+    writeStreamOperationStart.trigger =
+      switch self.trigger {
+      case .ProcessingTimeTrigger(let intervalMs):
+          .processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND")
+      case .OneTimeTrigger:
+          .once(true)
+      case .AvailableNowTrigger:
+          .availableNow(true)
+      case .ContinuousTrigger(let intervalMs):
+          .continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND")
+      default: .once(true)
+      }
+    if let outputMode = self.outputMode {
+      writeStreamOperationStart.outputMode = outputMode
+    }
+    if let queryName = self.queryName {
+      writeStreamOperationStart.queryName = queryName
+    }
+    if let path = self.path {
+      writeStreamOperationStart.sinkDestination = .path(path)
+    }
+    if let tableName = self.tableName {
+      writeStreamOperationStart.sinkDestination = .tableName(tableName)
+    }
+
+    var command = Spark_Connect_Command()
+    command.writeStreamOperationStart = writeStreamOperationStart
+
+    let response = try await df.spark.client.execute(df.spark.sessionID, 
command)
+    let result = response.first!.writeStreamOperationStartResult
+    if result.hasQueryStartedEventJson {
+      // TODO: post
+    }
+
+    let query = try await StreamingQuery(
+      UUID(uuidString: result.queryID.id)!,
+      UUID(uuidString: result.queryID.runID)!,
+      result.name,
+      self.df.sparkSession,
+    )
+
+    return query
+  }
+
+  /// Starts the execution of the streaming query, which will continually 
output results to the
+  /// given table as new data arrives. The returned ``StreamingQuery`` object 
can be used to interact
+  /// with the stream.
+  /// - Parameter tableName: A table name.
+  /// - Returns: A ``StreamingQuery``.
+  public func toTable(tableName: String) async throws -> StreamingQuery {
+    self.tableName = tableName
+    return try await start()
+  }
+}
diff --git a/Sources/SparkConnect/SparkSession.swift 
b/Sources/SparkConnect/SparkSession.swift
index e588ace..392f387 100644
--- a/Sources/SparkConnect/SparkSession.swift
+++ b/Sources/SparkConnect/SparkSession.swift
@@ -194,7 +194,7 @@ public actor SparkSession {
   /// Returns a DataFrameReader for reading data in various formats.
   ///
   /// The DataFrameReader provides methods to load data from external storage 
systems
-  /// such as file systems, databases, and streaming sources.
+  /// such as file systems and databases.
   ///
   /// ```swift
   /// // Read a CSV file
@@ -208,14 +208,31 @@ public actor SparkSession {
   ///     .json("path/to/file.json")
   ///
   /// // Read an ORC file
-  /// let parquetData = spark.read
+  /// let orcData = spark.read
   ///     .orc("path/to/file.orc")
   /// ```
   ///
   /// - Returns: A DataFrameReader instance configured for this session
   public var read: DataFrameReader {
     get {
-      return DataFrameReader(sparkSession: self)
+      DataFrameReader(sparkSession: self)
+    }
+  }
+
+  /// Returns a `DataStreamReader` that can be used to read streaming data in 
as a `DataFrame`.
+  ///
+  /// The DataFrameReader provides methods to load data from external storage 
systems
+  /// such as file systems, databases, and streaming sources.
+  ///
+  /// ```swift
+  /// // Read an ORC file
+  /// let orcData = spark.readStream.orc("path/to/file.orc")
+  /// ```
+  ///
+  /// - Returns: A DataFrameReader instance configured for this session
+  public var readStream: DataStreamReader {
+    get {
+      DataStreamReader(sparkSession: self)
     }
   }
 
diff --git a/Sources/SparkConnect/StreamingQuery.swift 
b/Sources/SparkConnect/StreamingQuery.swift
index 2758a5e..70f81e1 100644
--- a/Sources/SparkConnect/StreamingQuery.swift
+++ b/Sources/SparkConnect/StreamingQuery.swift
@@ -64,8 +64,8 @@ public actor StreamingQuery: Sendable {
     _ command: StreamingQueryCommand.OneOf_Command
   ) async throws -> [ExecutePlanResponse] {
     return try await self.sparkSession.client.executeStreamingQueryCommand(
-      self.id.uuidString,
-      self.runId.uuidString,
+      self.id.uuidString.lowercased(),
+      self.runId.uuidString.lowercased(),
       command
     )
   }
diff --git a/Sources/SparkConnect/TypeAliases.swift 
b/Sources/SparkConnect/TypeAliases.swift
index cb7864b..e0543b6 100644
--- a/Sources/SparkConnect/TypeAliases.swift
+++ b/Sources/SparkConnect/TypeAliases.swift
@@ -63,4 +63,5 @@ typealias UnresolvedAttribute = 
Spark_Connect_Expression.UnresolvedAttribute
 typealias WithColumnsRenamed = Spark_Connect_WithColumnsRenamed
 typealias WriteOperation = Spark_Connect_WriteOperation
 typealias WriteOperationV2 = Spark_Connect_WriteOperationV2
+typealias WriteStreamOperationStart = Spark_Connect_WriteStreamOperationStart
 typealias YearMonthInterval = Spark_Connect_DataType.YearMonthInterval
diff --git a/Tests/SparkConnectTests/DataStreamTests.swift 
b/Tests/SparkConnectTests/DataStreamTests.swift
new file mode 100644
index 0000000..00af6ae
--- /dev/null
+++ b/Tests/SparkConnectTests/DataStreamTests.swift
@@ -0,0 +1,69 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import Foundation
+import SparkConnect
+import Testing
+
+/// A test suite for `DataStreamReader` and `DataStreamWriter`
+@Suite(.serialized)
+struct DataStreamTests {
+  @Test
+  func query() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+
+    // Prepare directories
+    let input = "/tmp/input-" + UUID().uuidString
+    let checkpoint = "/tmp/checkpoint-" + UUID().uuidString
+    let output = "/tmp/output-" + UUID().uuidString
+    try await spark.range(2025).write.orc(input)
+
+    // Create a streaming dataframe.
+    let df =
+      try await spark
+      .readStream
+      .schema("id LONG")
+      .orc(input)
+    #expect(try await df.isStreaming())
+
+    // Processing
+    let df2 = await df.selectExpr("id", "id * 10 as value")
+
+    // Start a streaming query
+    let query =
+      try await df2
+      .writeStream
+      .option("checkpointLocation", checkpoint)
+      .outputMode("append")
+      .format("orc")
+      .trigger(Trigger.ProcessingTimeTrigger(intervalMs: 1000))
+      .start(output)
+    #expect(try await query.isActive)
+    // Wait for processing
+    try await Task.sleep(nanoseconds: 2_000_000_000)
+
+    try await query.stop()
+    #expect(try await query.isActive == false)
+
+    let df3 = await spark.read.orc(output)
+    #expect(try await df3.dtypes.count == 2)
+    #expect(try await df3.count() == 2025)
+    await spark.stop()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to