This is an automated email from the ASF dual-hosted git repository. bce pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/beam-swift.git
commit ab6de147a79b2ed796b91e9a09460b17d02211c6 Author: Byron Ellis <[email protected]> AuthorDate: Wed Oct 18 08:29:42 2023 -0700 Documentation updates --- .../ApacheBeam/Client/ApiServiceDescriptor.swift | 10 ++++++++++ Sources/ApacheBeam/Internal/Array+Helpers.swift | 2 ++ Sources/ApacheBeam/Internal/Data+Decoding.swift | 22 ++++++++++++++++++---- Sources/ApacheBeam/Internal/Data+Encoding.swift | 14 +++++++++++--- Sources/ApacheBeam/Internal/Date+Timestamp.swift | 5 ++++- .../ApacheBeam/Internal/Dictionary+Helpers.swift | 4 +++- .../ApacheBeam/Internal/PipelineProto+Coder.swift | 3 +++ .../Internal/PipelineProto+Environment.swift | 3 +++ .../Internal/PipelineProto+Initializers.swift | 20 ++++++++++++++++---- .../ApacheBeam/Transforms/BuiltIn+Elements.swift | 9 +++++++++ Sources/ApacheBeam/Transforms/BuiltIn.swift | 4 +++- Sources/ApacheBeam/Transforms/IO/FileIO.swift | 21 +++++++++++++++++++++ .../Transforms/IO/GoogleCloud/GoogleStorage.swift | 8 +++++++- .../Transforms/IO/Local/LocalStorage.swift | 1 - 14 files changed, 110 insertions(+), 16 deletions(-) diff --git a/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift b/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift index 66aeaea..359e3e0 100644 --- a/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift +++ b/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift @@ -37,6 +37,8 @@ public struct ApiServiceDescriptor { } extension ApiServiceDescriptor { + /// Populate the ``ApiServiceDescriptor`` from the Beam protobuf representation + /// - Parameter proto: A valid protocol buffer init(proto: Org_Apache_Beam_Model_Pipeline_V1_ApiServiceDescriptor) { url = proto.url } @@ -51,6 +53,14 @@ extension ApiServiceDescriptor: ProtoConversion { extension ApiServiceDescriptor: Hashable {} public extension ApiServiceDescriptor { + /// Convenience function to decode an ApiServiceDescriptor from a text or JSON encoded protocol buffer passed as environment variable. + /// This technique is used by Runners when launching containers. + /// + /// - Parameters: + /// - env: The name of the environment variable to check + /// - format: The variable's encoding. + /// - Returns: An ``ApiServiceDescriptor`` for the specified service. + /// static func from(env: String, format: EncodedAs = .textproto) throws -> ApiServiceDescriptor { switch format { case .textproto: diff --git a/Sources/ApacheBeam/Internal/Array+Helpers.swift b/Sources/ApacheBeam/Internal/Array+Helpers.swift index 9718f32..12d8411 100644 --- a/Sources/ApacheBeam/Internal/Array+Helpers.swift +++ b/Sources/ApacheBeam/Internal/Array+Helpers.swift @@ -17,6 +17,8 @@ */ extension [(String, String)] { + /// Convenience function that converts an aray of (String,String) elements into a dictionary type. + /// - Returns: A dictionary of `[String:String]` func dict() -> [String: String] { reduce(into: [:]) { $0[$1.0] = $1.1 } } diff --git a/Sources/ApacheBeam/Internal/Data+Decoding.swift b/Sources/ApacheBeam/Internal/Data+Decoding.swift index f6cd290..35ddb34 100644 --- a/Sources/ApacheBeam/Internal/Data+Decoding.swift +++ b/Sources/ApacheBeam/Internal/Data+Decoding.swift @@ -19,8 +19,10 @@ import Foundation extension Data { - /// advanced(by:) has issues on non-macOS Foundation implementations @inlinable + /// A safer version of `advanced(by:)`. On non-macOS Foundation implementations ``advanced(by:)`` segfaults on ``Data`` elements with size 0. + /// - Parameter by: Number of elements to advance + /// - Returns: A possibly different ``Data`` element. func safeAdvance(by: Int) -> Data { #if os(macOS) return advanced(by: by) @@ -36,6 +38,7 @@ extension Data { } /// Read a variable length integer from the current data + /// - Returns: An integer of up to 64 bits. mutating func varint() throws -> Int { var advance = 0 let result = try withUnsafeBytes { @@ -68,20 +71,27 @@ extension Data { return result } + + /// Decodes a Beam time value, which is a Java Instant that has been encoded to sort properly + /// - Returns: A Date value mutating func instant() throws -> Date { let millis = try next(Int64.self) &+ Int64(-9_223_372_036_854_775_808) return Date(millisecondsSince1970: millis) } - /// Read a length prefixed chunk of data + + /// Implements the Beam length-prefixed byte blob encoding + /// - Returns: A ``Data`` with a size defined by the length encoding mutating func subdata() throws -> Data { let length = try varint() let result = subdata(in: 0 ..< length) self = safeAdvance(by: length) return result } - - /// Extracts a fixed width type. Coming off the wire this will always be bigendian + + /// Read a fixed length integer of the specified type. In Beam the wire encoding is always a Java-style bigendian value + /// - Parameter _: The integer type of read + /// - Returns: The integer value read. mutating func next<T: FixedWidthInteger>(_: T.Type) throws -> T { let size = MemoryLayout<T>.size let bigEndian = withUnsafeBytes { @@ -91,6 +101,10 @@ extension Data { return T(bigEndian: bigEndian) } + + /// Read a fixed length floating point value of the specified type. + /// - Parameter _: The floating point type to read + /// - Returns: The floating point value mutating func next<T: FloatingPoint>(_: T.Type) throws -> T { let result = withUnsafeBytes { $0.load(as: T.self) diff --git a/Sources/ApacheBeam/Internal/Data+Encoding.swift b/Sources/ApacheBeam/Internal/Data+Encoding.swift index 3b0d6ae..82c2b19 100644 --- a/Sources/ApacheBeam/Internal/Data+Encoding.swift +++ b/Sources/ApacheBeam/Internal/Data+Encoding.swift @@ -19,6 +19,8 @@ import Foundation extension Data { + /// Encode a variable length integer up to 64-bits + /// - Parameter value: The value to encode mutating func varint(_ value: Int) { var current = UInt64(value) while current >= 0x80 { @@ -28,21 +30,27 @@ extension Data { append(UInt8(current)) } + + /// Append a Date as a modified Java Instant onto the. Resolution will be milliseconds. + /// - Parameter value: the date/time value to encode. mutating func instant(_ value: Date) { let be = (Int64(value.millisecondsSince1970) &- Int64(-9_223_372_036_854_775_808)).bigEndian Swift.withUnsafeBytes(of: be) { self.append(contentsOf: $0) } } - - // In Beam integers go on and off the wire in bigEndian format. This matches the original Java lineage. + + /// Append a fixed-width integer in a Java way (bigendian) to match Beam + /// - Parameter value: the value to encode mutating func next(_ value: some FixedWidthInteger) { let be = value.bigEndian Swift.withUnsafeBytes(of: be) { self.append(contentsOf: $0) } } - + + /// Append fixed-width floating point values in IEEE format. + /// - Parameter value: floating point value to append mutating func next(_ value: some FloatingPoint) { Swift.withUnsafeBytes(of: value) { self.append(contentsOf: $0) diff --git a/Sources/ApacheBeam/Internal/Date+Timestamp.swift b/Sources/ApacheBeam/Internal/Date+Timestamp.swift index 63dd57b..6a54cb2 100644 --- a/Sources/ApacheBeam/Internal/Date+Timestamp.swift +++ b/Sources/ApacheBeam/Internal/Date+Timestamp.swift @@ -19,10 +19,13 @@ import Foundation extension Date { + /// Convenience property to extract Java-style milliseconds since the UNIX epoch var millisecondsSince1970: Int64 { Int64((timeIntervalSince1970 * 1000.0).rounded()) } - + + /// Create a ``Date`` from UNIX epoch milliseconds + /// - Parameter millisecondsSince1970: Milliseconds to convert init(millisecondsSince1970: Int64) { self = Date(timeIntervalSince1970: Double(millisecondsSince1970) / 1000.0) } diff --git a/Sources/ApacheBeam/Internal/Dictionary+Helpers.swift b/Sources/ApacheBeam/Internal/Dictionary+Helpers.swift index feedd19..52814ea 100644 --- a/Sources/ApacheBeam/Internal/Dictionary+Helpers.swift +++ b/Sources/ApacheBeam/Internal/Dictionary+Helpers.swift @@ -17,7 +17,9 @@ */ public extension Dictionary where Key: Comparable { - /// Return key-value pairs sorted by key. + + /// Return the elements of a dictionary in a stable ordering + /// - Returns: An array of pairs sorted by key func sorted() -> [(Key, Value)] { map { ($0, $1) }.sorted(by: { $0.0 < $1.0 }) } diff --git a/Sources/ApacheBeam/Internal/PipelineProto+Coder.swift b/Sources/ApacheBeam/Internal/PipelineProto+Coder.swift index a8c2603..7479e93 100644 --- a/Sources/ApacheBeam/Internal/PipelineProto+Coder.swift +++ b/Sources/ApacheBeam/Internal/PipelineProto+Coder.swift @@ -17,6 +17,9 @@ */ extension PipelineProto { + /// Construct an appropriate coder representation from the current pipeline, perhaps also encoding component coders. + /// - Parameter from: The ``Coder`` to convert. + /// - Returns: ``PipelineComponent`` representing this coder mutating func coder(from: Coder) -> PipelineComponent { let componentCoders: [String] = switch from { case let .keyvalue(keyCoder, valueCoder): diff --git a/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift b/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift index ba130bd..f8f0034 100644 --- a/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift +++ b/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift @@ -17,6 +17,9 @@ */ extension PipelineProto { + /// Encode a native ``Environment`` into its protobuf form as a ``PipelineComponent`` + /// - Parameter from: the ``Environment`` to encode + /// - Returns: A ``PipelineComponent`` representation mutating func environment(from: Environment) throws -> PipelineComponent { try environment { _ in try .with { diff --git a/Sources/ApacheBeam/Internal/PipelineProto+Initializers.swift b/Sources/ApacheBeam/Internal/PipelineProto+Initializers.swift index d17ef35..502ed64 100644 --- a/Sources/ApacheBeam/Internal/PipelineProto+Initializers.swift +++ b/Sources/ApacheBeam/Internal/PipelineProto+Initializers.swift @@ -16,6 +16,8 @@ * limitations under the License. */ +/// An enumeration for representing various elements of a pipeline protocol buffer. This enum is used +/// when constructing the pipeline for submission to a portable runner. enum PipelineComponent { case none case transform(String, PTransformProto) @@ -23,7 +25,8 @@ enum PipelineComponent { case coder(String, CoderProto) case windowingStrategy(String, WindowingStrategyProto) case environment(String, EnvironmentProto) - + + /// The name of the pipeline component. Most components in the protocol buffer have a name used to reference the component throughout the pipeline structure. var name: String { switch self { case .none: @@ -41,6 +44,8 @@ enum PipelineComponent { } } + + /// The PTransfrom protocol buffer for this component. Returns ``nil`` for everything except a ``transform`` type. var transform: PTransformProto? { if case let .transform(_, pTransformProto) = self { return pTransformProto @@ -50,16 +55,23 @@ enum PipelineComponent { } } -/// Convenience function for creating new pipeline elements. Note that these shouldn't be accessed concurrently -/// but this isn't a problem itself since trying to access the proto concurrently throws an error. +// Convenience function for creating new pipeline elements. Note that these shouldn't be accessed concurrently +// but this isn't a problem itself since trying to access the proto concurrently throws an error. extension PipelineProto { + + /// Adds a new `PTransform` protocol buffer to the pipeline along with an internal reference name to be used by other parts of the pipeline. + /// - Parameter mapper: A closure that returns the `PTransform` protocol buffer. Includes the reference id as a parameter in case it is needed. + /// - Returns: A ``PipelineComponent`` representing this `PTransform` mutating func transform(_ mapper: @escaping (String) throws -> PTransformProto) throws -> PipelineComponent { let name = "ref_PTransform_\(components.transforms.count + 1)" let proto = try mapper(name) components.transforms[name] = proto return .transform(name, proto) } - + + /// Adds a new `PCollection` protocol buffer to the pipeline along with an internal reference name to be used elsewhere (e.g. `PTransform`s) + /// - Parameter mapper: A closure that returns the `PCollection` protocol buffer. It is passed the reference name in case that is useful + /// - Returns: A ``PipelineComponent`` representing the `PCollection` mutating func collection(_ mapper: @escaping (String) throws -> PCollectionProto) throws -> PipelineComponent { let name = "ref_PCollection_\(components.pcollections.count + 1)" let proto = try mapper(name) diff --git a/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift b/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift index 5c10295..0743bd5 100644 --- a/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift +++ b/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift @@ -20,6 +20,10 @@ import Foundation public extension PCollection { // No Output + /// Apply a closure-based ``DoFn`` to an input ``PCollection`` that returns no output. + /// - Parameters: + /// - name: A name for this transform + /// - fn: A closure that implements a DoFn by taking a single ``PInput`` and emitting no outputs func pardo(name: String, _ fn: @Sendable @escaping (PInput<Of>) async throws -> Void) { pstream(name: name) { input in for try await element in input { @@ -29,6 +33,11 @@ public extension PCollection { } // One Output + /// Apply a closure-based ``DoFn`` to an input ``PCollection`` that emits to a single output ``PCollection`` + /// - Parameters: + /// - name: A name for this transform + /// - fn: <#fn description#> + /// - Returns: <#description#> func pardo<O0>(name: String, _ fn: @Sendable @escaping (PInput<Of>, POutput<O0>) async throws -> Void) -> PCollection<O0> { pstream(name: name) { input, output in for try await element in input { diff --git a/Sources/ApacheBeam/Transforms/BuiltIn.swift b/Sources/ApacheBeam/Transforms/BuiltIn.swift index cf0d8b2..5ee71d5 100644 --- a/Sources/ApacheBeam/Transforms/BuiltIn.swift +++ b/Sources/ApacheBeam/Transforms/BuiltIn.swift @@ -203,7 +203,9 @@ public extension PCollection { } public extension PCollection { - /// Core GroupByKey transform. Requires a pair input + /// The built-in Beam GroupByKey transform. Requires that the input be a Beam key-value type. + /// + /// - Returns: A ``PCollection`` of type `KV<K,V>` where the value of the key-value pair is a `Sequence` of all values associated with the key `K` func groupByKey<K, V>() -> PCollection<KV<K, V>> where Of == KV<K, V> { // Adjust the coder for the pcollection to reflect GBK semantcs let output = PCollection<KV<K, V>>(coder: .keyvalue(.of(type: K.self)!, .of(type: [V].self)!), type: streamType) diff --git a/Sources/ApacheBeam/Transforms/IO/FileIO.swift b/Sources/ApacheBeam/Transforms/IO/FileIO.swift index 5bd2079..e7b6e15 100644 --- a/Sources/ApacheBeam/Transforms/IO/FileIO.swift +++ b/Sources/ApacheBeam/Transforms/IO/FileIO.swift @@ -18,12 +18,33 @@ import Foundation +/// Defines a protocol for transforms that know how to read from external File-like systems given an input +/// collection of paths. public protocol FileIOSource { + /// A static function definition that takes `(path,matched_filename)` pairs output by ``readFiles(matching:)`` or some other source + /// and reads them as a single data output. + /// + /// - Parameter matching: An input ``PCollection`` of pairs representing a `(path,matched_filename)` + /// where `path` and `matched_filename` are dependent on the specific source. + /// - Returns: A ``PCollection`` of binary `Data` elements. Note that the original path and filename is not carried with the data. static func readFiles(matching: PCollection<KV<String, String>>) -> PCollection<Data> + + /// Static function definition for reading a list of files matching some pattern at a given path. The definition of "path" largely depends on the + /// source type. For example, for remote object stores that might be a bucket or a bucket prefix while for a local filesystem it would be a + /// standard file path. It is best to check the specific transform's documentation to verify + /// + /// + /// - Parameter matching: A pair representing the path to check and pattern to check with. + /// + /// - Returns: A list of pairs of the form `(path,matched_filename)` static func listFiles(matching: PCollection<KV<String, String>>) -> PCollection<KV<String, String>> } public extension PCollection<KV<String, String>> { + /// A transform for reading files from a concrete ``FileIOSource``. + /// - Parameter `_`: The type of ``FileIOSource`` to use. For example ``LocalStorage.self`` + /// + /// - Returns: The data elements from the ``readFiles(matching:)`` implementation for this ``FileIOSource`` func readFiles<Source: FileIOSource>(in _: Source.Type) -> PCollection<Data> { Source.readFiles(matching: self) } diff --git a/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift b/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift index b289f94..0481f7e 100644 --- a/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift +++ b/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift @@ -69,7 +69,13 @@ public struct GoogleStorage: FileIOSource { } } } - + + /// Match files in a Google Cloud Storage Bucket matching the prefix given by `(bucket,prefix)` string pairs. + /// + /// Note: Currently only the DefaultTokenProvider is supported. + /// + /// - Parameter matching: A ``PCollection`` of `(bucket,prefix)` pairs. + /// - Returns: A ``PCollection`` of `(bucket,file)` pairs. public static func listFiles(matching: PCollection<KV<String, String>>) -> PCollection<KV<String, String>> { matching.pstream(type: .bounded) { matching, output in diff --git a/Sources/ApacheBeam/Transforms/IO/Local/LocalStorage.swift b/Sources/ApacheBeam/Transforms/IO/Local/LocalStorage.swift index 51a04a5..aea4820 100644 --- a/Sources/ApacheBeam/Transforms/IO/Local/LocalStorage.swift +++ b/Sources/ApacheBeam/Transforms/IO/Local/LocalStorage.swift @@ -32,7 +32,6 @@ public struct LocalStorage: FileIOSource { public static func listFiles(matching: PCollection<KV<String, String>>) -> PCollection<KV<String, String>> { matching.pstream(type: .bounded) { input, output in - let fm = FileManager.default for try await (pathAndPattern, ts, w) in input { let path = pathAndPattern.key let patterns = try pathAndPattern.values.map { try Regex($0) }
