This is an automated email from the ASF dual-hosted git repository. bce pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/beam-swift.git
commit f58a38eaddfa86210fe438d572c6804fd57d0f4a Author: Byron Ellis <[email protected]> AuthorDate: Thu Oct 5 13:40:10 2023 -0700 More documentation. Realized that flatMap could just operate on Sequence rather than specifically on arrays so added that. --- Sources/ApacheBeam/Transforms/Basic.swift | 75 ++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 11 deletions(-) diff --git a/Sources/ApacheBeam/Transforms/Basic.swift b/Sources/ApacheBeam/Transforms/Basic.swift index ed9b452..71d70a3 100644 --- a/Sources/ApacheBeam/Transforms/Basic.swift +++ b/Sources/ApacheBeam/Transforms/Basic.swift @@ -18,9 +18,22 @@ import Foundation -/// Creating Static Values +// Utility Transforms public extension PCollection { - /// Each time the input fires output all of the values in this list. + /// For each item from the input ``PCollection`` emit all of the items in a static list of values. + /// + /// This transform is most commonly used with ``impulse()`` to create an intial set of inputs for the pipeline. The values passed to + /// this transform are encoded in the pipeline submission making it suitable for passing information from input parameters, such as a set + /// of files to process. + /// + /// Note that because these values are encoded in the pipeline itself that they are subject to the maximum size of a protocol buffer (2GB) + /// and large inputs can affect submitting the pipeline to the runner. + /// + /// - Parameters: + /// - values: An array of ``Codable`` values to be emitted for each input record + /// - name: An optional name for this transform. By default it will use the source file and line number for easy debugging + /// + /// - Returns: A ``PCollection`` containing contaings items of type `Value` func create<Value: Codable>(_ values: [Value], name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Value> { pstream(name: name ?? "\(_file):\(_line)", type: .bounded, values) { values, input, output in for try await (_, ts, w) in input { @@ -32,7 +45,7 @@ public extension PCollection { } } -/// Convenience logging mappers +// Logging Transforms public extension PCollection { @discardableResult func log(prefix: String, _ name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Of> where Of == String { @@ -58,16 +71,37 @@ public extension PCollection { } } -/// Modifying Values +// Mapping Operations public extension PCollection { - /// Modify a value without changing its window or timestamp + /// Perform a scalar transformation of input values from a ``PCollection`` to another type without modifying the window or the timestamp. + /// + /// For example, the following code would return a ``PCollection<String>`` if applied to an input ``PCollection`` whose value has + /// a ``String`` property named `name` + /// ```swift + /// input.map { $0.name } + /// ``` + /// Note: The return type of the mapping function is treated as a scalar even if it is an iterable type. To return multiple values from a single input + /// use ``flatMap(name:_file:_line:_:)`` instead. + /// + /// - Parameters: + /// - name: An optional name for this transform. By default it will use the file name and line number for easy debugging. + /// - fn: A trailing closure specifying the mapping function + /// + /// - Returns: A ``PCollection<Out>`` of scalar transformations. func map<Out>(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> { pardo(name: name ?? "\(_file):\(_line)") { input, output in output.emit(fn(input.value)) } } - /// Map function to convert a tuple pair into an encodable key-value pair + /// Map a scalar input into a tuple representing a key-value pair. This is most commonly used in conjunction with ``groupByKey()`` transformations but can + /// be used as a scalar transform to Beam's native key-value coding type. + /// + /// - Parameters: + /// - name: An optional name for this transform. By default it will use the file name and line number for easy debugging. + /// - fn: A trailing closure specifying the mapping function + /// + /// - Returns: A ``PCollection<KV<K,V>>`` which is encoded using Beam's native key value coding. func map<K, V>(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> (K, V)) -> PCollection<KV<K, V>> { pardo(name: name ?? "\(_file):\(_line)") { input, output in let (key, value) = fn(input.value) @@ -75,9 +109,16 @@ public extension PCollection { } } - /// Produce multiple outputs as a single value without modifying window or timestamp - func flatMap<Out>(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection<Out> { - pardo(name: name ?? "\(_file):\(_line)") { input, output in + /// Map a ``PCollection`` to zero or more outputs by returning a ``Sequence`` + /// + /// - Parameters: + /// - name: An optional name for this transform. By default it will use the source filename and line number for easy debugging. + /// - fn: A trailing closure that returns a ``Sequence``. + /// + /// - Returns: A ``PCollection`` of values with the type ``Sequence.Element`` + /// + func flatMap<S:Sequence>(name: String? = nil,_file: String = #fileID,_line: Int = #line,_ fn: @Sendable @escaping (Of) -> S) -> PCollection<S.Element> { + pardo(name: name ?? "\(_file):\(_line)") { input,output in for i in fn(input.value) { output.emit(i) } @@ -85,8 +126,15 @@ public extension PCollection { } } -/// Modifying Timestamps +// Timestamp Operations public extension PCollection { + /// Modifies the timestamps of values in the input ``PCollection`` according to user specified logic + /// - Parameters: + /// - name: An optional name for this transform. By default it will use the source filename and line number for easy debugging. + /// - fn: A trailing closure that returns the new timestamp for the input value. + /// + /// - Returns: A ``PCollection`` with modified timestamps. + /// func timestamp(name: String? = nil, _file: String = #fileID, _line: Int = #line, _ fn: @Sendable @escaping (Of) -> Date) -> PCollection<Of> { pstream(name: name ?? "\(_file):\(_line)") { input, output in for try await (value, _, w) in input { @@ -97,7 +145,12 @@ public extension PCollection { } public extension PCollection<Never> { - /// Convenience function to add an impulse when we are at the root of the pipeline + /// A convience implementation of ``create(_:name:_file:_line:)-38du`` that prepends an ``impulse()`` transform to a pipeline root. + /// - Parameters: + /// - values: The values to emit when the impulse item is received + /// - name: An optional name for this transform. By default it will use the source filename and line number for easy debugging. + /// + /// - Returns: A ``PCollection`` of values from the static list func create<Value: Codable>(_ values: [Value], name: String? = nil, _file: String = #fileID, _line: Int = #line) -> PCollection<Value> { impulse().create(values, name: name, _file: _file, _line: _line) }
