This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit 739ea5a644e22ca61f69cb7c23c2c67fffba80b8 Author: Byron Ellis <byronel...@google.com> AuthorDate: Thu Aug 24 17:45:18 2023 -0700 Rename previous stream-based pardo implementation to pstream and start to introduce an element-wise pardo. This better matches other SDK conventions (i.e. processElement) and allows us to implicitly transfer the timestamp and window from the input to an output. Repurposes the PInput and POutput protocols to handle this functionality. Also update the map and flatmap implementations to use pardo rather than pstream to get a sense of how well it works (well). --- sdks/swift/Package.swift | 1 + .../ApacheBeam/Core/DynamicProperties.swift | 16 --- .../Sources/ApacheBeam/Core/Fn/ClosureFn.swift | 10 +- .../Core/PCollection/AnyPCollection.swift | 3 +- .../Core/PCollection/AnyPCollectionStream.swift | 2 - .../ApacheBeam/Core/PCollection/PCollection.swift | 11 ++- .../Core/PCollection/PCollectionStream.swift | 5 +- .../Runtime/Bundle/BundleProcessor.swift | 2 + .../Runtime/Metrics/MetricReporter.swift | 4 + .../ApacheBeam/Testing/PCollection+Testing.swift | 19 ++++ .../PCollection => Testing}/PCollectionTest.swift | 0 .../Sources/ApacheBeam/Transforms/Basic.swift | 23 ++--- .../ApacheBeam/Transforms/BuiltIn+Elements.swift | 109 +++++++++++++++++++++ .../Sources/ApacheBeam/Transforms/BuiltIn.swift | 96 +++++++----------- .../Sources/ApacheBeam/Transforms/Combining.swift | 2 +- .../Transforms/IO/GoogleCloud/GoogleStorage.swift | 12 +-- .../Sources/Examples/Wordcount/Wordcount.swift | 2 + .../Pipeline/CompositeIntegrationTests.swift | 14 +-- .../ApacheBeamTests/Pipeline/FileIOTests.swift | 6 +- .../Pipeline/IntegrationTests.swift | 14 +-- 20 files changed, 230 insertions(+), 121 deletions(-) diff --git a/sdks/swift/Package.swift b/sdks/swift/Package.swift index b25688b98b6..5d2771992d8 100644 --- a/sdks/swift/Package.swift +++ b/sdks/swift/Package.swift @@ -62,6 +62,7 @@ let package = Package( .product(name: "OAuth2", package:"google-auth-library-swift") ] ), + .target(name:"Wordcount",dependencies: ["ApacheBeam"],path:"Sources/Examples/Wordcount"), .testTarget( name: "ApacheBeamTests", dependencies: ["ApacheBeam"]), diff --git a/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift b/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift index 713661b7d8f..474353fcff9 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift @@ -20,22 +20,6 @@ import Logging public protocol DynamicProperty { } -@propertyWrapper -public struct PInput<Of> : DynamicProperty { - public var wrappedValue: PCollectionStream<Of> - - public init(wrappedValue: PCollectionStream<Of> = .init()) { - self.wrappedValue = wrappedValue - } -} - -@propertyWrapper -public struct POutput<Of> : DynamicProperty { - public var wrappedValue: PCollectionStream<Of> - public init(wrappedValue: PCollectionStream<Of> = .init()) { - self.wrappedValue = wrappedValue - } -} @propertyWrapper public struct RemoteLog : DynamicProperty { diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift index fcf43a91215..c3f443f09ea 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift @@ -18,11 +18,13 @@ import Foundation + + /// A SerializableFn that holds a reference to a function that takes a single input and produces a variable number of outputs public final class ClosureFn : SerializableFn { let processClosure: (SerializableFnBundleContext,[AnyPCollectionStream],[AnyPCollectionStream]) async throws -> Void - //TODO: Replace this with a parameter pack version once I figure out how to do that + public init<Of>(_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { self.processClosure = { context,inputs,outputs in @@ -35,6 +37,7 @@ public final class ClosureFn : SerializableFn { try await fn(inputs[0].stream(),outputs[0].stream()) } } + public init<Of,O0,O1>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) { self.processClosure = { context,inputs,outputs in @@ -42,17 +45,22 @@ public final class ClosureFn : SerializableFn { } } + + public init<Of,O0,O1,O2>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) { self.processClosure = { context,inputs,outputs in try await fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream()) } } + + public init<Of,O0,O1,O2,O3>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream,PCollection<O3>.Stream) async throws -> Void) { self.processClosure = { context,inputs,outputs in try await fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream(),outputs[3].stream()) } } + public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) { try await processClosure(context,inputs,outputs) diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift index 48c6b39c2b2..e28af2185c2 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift @@ -71,10 +71,11 @@ public struct AnyPCollection : PCollectionProtocol,PipelineMember { fatalError("Do not use `stream` on AnyPCollection. Use `anyStream` instead.") } + public var anyStream: AnyPCollectionStream { streamClosure(collection) } - + var roots: [PCollection<Never>] { rootsClosure(collection) } diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift index 959d9a04217..9d252834b50 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift @@ -53,8 +53,6 @@ public struct AnyPCollectionStream : AsyncSequence { try stream.emit(beamValue) } else if let element = $1 as? Element { stream.emit((element.0 as! Of,element.1,element.2)) - } else if let element = $1 as? Of { - stream.emit(element) } else { throw ApacheBeamError.runtimeError("Unable to send \($1) to \(stream)") } diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift index 9918ed69ace..dd9a2334923 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift @@ -36,15 +36,22 @@ public final class PCollection<Of> : PCollectionProtocol { public let coder: Coder public var consumers: [PipelineTransform] public private(set) var parent: PipelineTransform? + + private let staticStream : PCollectionStream<Of>? - public init(coder: Coder = .of(type: Of.self)!,parent: PipelineTransform? = nil,consumers:[PipelineTransform] = []) { + public init(coder: Coder = .of(type: Of.self)!, + parent: PipelineTransform? = nil, + consumers:[PipelineTransform] = [], + stream:PCollectionStream<Of>? = nil + ) { self.coder = coder self.consumers = consumers self.parent = parent + self.staticStream = stream } public var stream: PCollectionStream<Of> { - return PCollectionStream<Of>() + return staticStream ?? PCollectionStream<Of>() } @discardableResult diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift index 38b8f795ac1..ba4ad0b26a5 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift @@ -18,6 +18,8 @@ import Foundation + + /// The worker side realization of a PCollection that supports reading and writing public final class PCollectionStream<Of> : AsyncSequence { public typealias Element = (Of,Date,Window) @@ -25,6 +27,7 @@ public final class PCollectionStream<Of> : AsyncSequence { private let stream: AsyncStream<Element> private let emitter: AsyncStream<Element>.Continuation + public init() { (self.stream,self.emitter) = AsyncStream.makeStream(of:Element.self) } @@ -42,7 +45,7 @@ public final class PCollectionStream<Of> : AsyncSequence { emitter.yield(value) } - public func emit(_ value: Of,timestamp: Date = .now,window:Window = .global) { + public func emit(_ value: Of,timestamp: Date,window:Window) { emit((value,timestamp,window)) } diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift index 8f1b2cac9de..4e977d6f112 100644 --- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift @@ -42,6 +42,7 @@ struct BundleProcessor { var temp: [Step] = [] var coders = BundleCoderContainer(bundle:descriptor) + var streams: [String:AnyPCollectionStream] = [:] // First make streams for everything in this bundle (maybe I could use the pcollection array for this?) for (_,transform) in descriptor.transforms { @@ -64,6 +65,7 @@ struct BundleProcessor { //Map the input and output streams in the correct order let inputs = transform.inputs.sorted().map { streams[$0.1]! } let outputs = transform.outputs.sorted().map { streams[$0.1]! } + if urn == "beam:runner:source:v1" { let remotePort = try RemoteGrpcPort(serializedData: transform.spec.payload) let coder = try Coder.of(name: remotePort.coderID, in: coders) diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift new file mode 100644 index 00000000000..9c2e5974716 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift @@ -0,0 +1,4 @@ +/// Background actor that handles reporting metrics to the backend. Allows the metrics implementation to be asynchronous. +actor MetricReporter { + +} diff --git a/sdks/swift/Sources/ApacheBeam/Testing/PCollection+Testing.swift b/sdks/swift/Sources/ApacheBeam/Testing/PCollection+Testing.swift new file mode 100644 index 00000000000..5acdc111fca --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Testing/PCollection+Testing.swift @@ -0,0 +1,19 @@ +import Foundation + +public extension PCollection { + + /// Create a PCollection whose stream has been preloaded with some values for testing + static func testValues<V:Beamable>(_ values:[V]) -> PCollection<V> { + let stream = PCollectionStream<V>() + for v in values { + stream.emit(v,timestamp:.now,window:.global) + } + return PCollection<V>(stream:stream) + } + + /// Convenience function that simulates an impulse + static func testImpulse() -> PCollection<Data> { + testValues([Data()]) + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionTest.swift b/sdks/swift/Sources/ApacheBeam/Testing/PCollectionTest.swift similarity index 100% rename from sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionTest.swift rename to sdks/swift/Sources/ApacheBeam/Testing/PCollectionTest.swift diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift index af9e89b18bd..9da48003b6d 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift @@ -21,7 +21,7 @@ public extension PCollection { /// Each time the input fires output all of the values in this list. func create<Value:Codable>(_ values: [Value],name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> { - return pardo(name:name ?? "\(_file):\(_line)",values) { values,input,output in + return pstream(name:name ?? "\(_file):\(_line)",values) { values,input,output in for try await (_,ts,w) in input { for v in values { output.emit(v,timestamp:ts,window:w) @@ -36,7 +36,7 @@ public extension PCollection { @discardableResult func log(prefix:String,_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Of> where Of == String { - pardo(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output in + pstream(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output in for await element in input { print("\(prefix): \(element)") output.emit(element) @@ -46,7 +46,7 @@ public extension PCollection { @discardableResult func log<K,V>(prefix:String,_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<KV<K,V>> where Of == KV<K,V> { - pardo(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output in + pstream(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output in for await element in input { let kv = element.0 for v in kv.values { @@ -64,28 +64,23 @@ public extension PCollection { /// Modify a value without changing its window or timestamp func map<Out>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> { return pardo(name:name ?? "\(_file):\(_line)") { input,output in - for try await (v,ts,w) in input { - output.emit(fn(v),timestamp:ts,window:w) - } + output.emit(fn(input.value)) } } + /// Map function to convert a tuple pair into an encodable key-value pair func map<K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { return pardo(name:name ?? "\(_file):\(_line)") { input,output in - for try await (i,ts,w) in input { - let (key,value) = fn(i) - output.emit(KV(key,value),timestamp:ts,window:w) - } + let (key,value) = fn(input.value) + output.emit(KV(key,value)) } } /// Produce multiple outputs as a single value without modifying window or timestamp func flatMap<Out>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection<Out> { return pardo(name:name ?? "\(_file):\(_line)") { input,output in - for try await (v,ts,w) in input { - for i in fn(v) { - output.emit(i,timestamp:ts,window:w) - } + for i in fn(input.value) { + output.emit(i) } } } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift new file mode 100644 index 00000000000..e29ea085b7d --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +public protocol PInput<Of> { + associatedtype Of + + var value: Of { get } + var timestamp: Date { get } + var window: Window { get } +} + +public protocol POutput<Of> { + associatedtype Of + + func emit(_ value: Of,timestamp: Date,window: Window) + func emit(_ value: Of) + func emit(_ value: Of,timestamp: Date) + func emit(_ value: Of,window: Window) +} + + +struct PardoInput<Of> : PInput { + let value: Of + let timestamp: Date + let window: Window + public init(_ value:(Of,Date,Window)) { + self.value = value.0 + self.timestamp = value.1 + self.window = value.2 + } +} + +struct PardoOutput<Of> : POutput { + + + let stream: PCollectionStream<Of> + let timestamp: Date + let window: Window + + func emit(_ value: Of, timestamp: Date, window: Window) { + stream.emit(value,timestamp:timestamp,window:window) + } + func emit(_ value: Of, timestamp: Date) { + stream.emit(value,timestamp:timestamp,window:window) + + } + func emit(_ value: Of, window: Window) { + stream.emit(value,timestamp:timestamp,window:window) + + } + func emit(_ value: Of) { + stream.emit(value,timestamp:timestamp,window:window) + } + + +} + +public extension PCollection { + + // No Output + func pardo(name:String,_ fn: @Sendable @escaping (any PInput<Of>) async throws -> Void) { + pstream(name:name) { input in + for try await element in input { + try await fn(PardoInput(element)) + } + } + } + + // One Output + func pardo<O0>(name:String,_ fn: @Sendable @escaping (any PInput<Of>,any POutput<O0>) async throws -> Void) -> PCollection<O0> { + pstream(name:name) { input,output in + for try await element in input { + try await fn(PardoInput(element),PardoOutput(stream:output,timestamp:element.1,window:element.2)) + } + } + } + + //Two Outputs + func pardo<O0,O1>(name:String,_ fn: @Sendable @escaping (any PInput<Of>,any POutput<O0>,any POutput<O1>) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { + pstream(name:name) { input,o0,o1 in + for try await element in input { + try await fn(PardoInput(element), + PardoOutput(stream:o0,timestamp:element.1,window:element.2), + PardoOutput(stream:o1,timestamp:element.1,window:element.2) + + ) + } + } + } + + +} diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift index d33a576c42c..65bbe10389e 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift @@ -33,68 +33,64 @@ public extension PCollection { // TODO: Replace with parameter pack version once https://github.com/apple/swift/issues/67192 is resolved // No Output - func pardo<F:SerializableFn>(name:String,_ fn: F) { + func pstream<F:SerializableFn>(name:String,_ fn: F) { self.apply(.pardo(AnyPCollection(self),name, fn, [])) } - func pardo(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { + func pstream(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { self.apply(.pardo(AnyPCollection(self),name, ClosureFn(fn),[])) } - func pardo<Param:Codable>(name:String,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { + func pstream<Param:Codable>(name:String,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { self.apply(.pardo(AnyPCollection(self),name, ParameterizedClosureFn(param,fn), [])) } // No Output Generated Names - func pardo<F:SerializableFn>(_file:String=#fileID,_line:Int=#line,_ fn: F) { - self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, [])) + func pstream<F:SerializableFn>(_file:String=#fileID,_line:Int=#line,_ fn: F) { + pstream(name:"\(_file):\(_line)",fn) } - func pardo(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { - self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", ClosureFn(fn),[])) + func pstream(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { + pstream(name:"\(_file):\(_line)",fn) } - func pardo<Param:Codable>(_file:String=#fileID,_line:Int=#line,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { - self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", ParameterizedClosureFn(param,fn), [])) + func pstream<Param:Codable>(_file:String=#fileID,_line:Int=#line,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { + pstream(name:"\(_file):\(_line)",param,fn) } // Single Output - func pardo<F:SerializableFn,O0>(name:String,fn: F, + func pstream<F:SerializableFn,O0>(name:String,_ fn: F, _ o0:PCollection<O0>) { self.apply(.pardo(AnyPCollection(self),name, fn, [AnyPCollection(o0)])) } - func pardo<O0>(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { + func pstream<O0>(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { let output = PCollection<O0>() self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output)])) return output } - func pardo<Param:Codable,O0>(name:String,_ param: Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { + func pstream<Param:Codable,O0>(name:String,_ param: Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { let output = PCollection<O0>() self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) return output } // Single Output Generated Names - func pardo<F:SerializableFn,O0>(_file:String=#fileID,_line:Int=#line,fn: F, + func pstream<F:SerializableFn,O0>(_file:String=#fileID,_line:Int=#line,fn: F, _ o0:PCollection<O0>) { - self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, [AnyPCollection(o0)])) + pstream(name:"\(_file):\(_line)",fn,o0) } - func pardo<O0>(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { - let output = PCollection<O0>() - output.parent(self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)]))) - return output + func pstream<O0>(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { + pstream(name:"\(_file):\(_line)",fn) } - func pardo<Param:Codable,O0>(_file:String=#fileID,_line:Int=#line,_ param: Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { - let output = PCollection<O0>() - output.parent(self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))) - return output + func pstream<Param:Codable,O0>(_file:String=#fileID,_line:Int=#line,_ param: Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { + pstream(name:"\(_file):\(_line)",param,fn) } // Two Outputs - func pardo<F:SerializableFn,O0,O1>(name:String,_ fn: F, + func pstream<F:SerializableFn,O0,O1>(name:String,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>) { self.apply(.pardo(AnyPCollection(self),name, fn, [AnyPCollection(o0),AnyPCollection(o1)])) } - func pardo<O0,O1>(name:String, + func pstream<O0,O1>(name:String, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { let output = (PCollection<O0>(),PCollection<O1>()) let parent = self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) @@ -102,7 +98,7 @@ public extension PCollection { output.1.parent(parent) return output } - func pardo<Param:Codable,O0,O1>(name:String,_ param: Param, + func pstream<Param:Codable,O0,O1>(name:String,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { let output = (PCollection<O0>(),PCollection<O1>()) let parent = self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) @@ -112,37 +108,27 @@ public extension PCollection { } // Two Outputs Generated Names - func pardo<F:SerializableFn,O0,O1>(_file:String=#fileID,_line:Int=#line,_ fn: F, + func pstream<F:SerializableFn,O0,O1>(_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>) { - let parent = self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1)])) - o0.parent(parent) - o1.parent(parent) + pstream(name:"\(_file):\(_line)",fn,o0,o1) } - func pardo<O0,O1>(_file:String=#fileID,_line:Int=#line, + func pstream<O0,O1>(_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { - let output = (PCollection<O0>(),PCollection<O1>()) - let parent = self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) - output.0.parent(parent) - output.1.parent(parent) - return output + pstream(name:"\(_file):\(_line)",fn) } - func pardo<Param:Codable,O0,O1>(_file:String=#fileID,_line:Int=#line,_ param: Param, + func pstream<Param:Codable,O0,O1>(_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { - let output = (PCollection<O0>(),PCollection<O1>()) - let parent = self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) - output.0.parent(parent) - output.1.parent(parent) - return output + pstream(name:"\(_file):\(_line)",param,fn) } // Three Outputs - func pardo<F:SerializableFn,O0,O1,O2>(name:String,_ fn: F, + func pstream<F:SerializableFn,O0,O1,O2>(name:String,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>,_ o2:PCollection<O2>) { self.apply(.pardo(AnyPCollection(self),name, fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) } - func pardo<O0,O1,O2>(name:String, + func pstream<O0,O1,O2>(name:String, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) let parent = self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) @@ -151,7 +137,7 @@ public extension PCollection { output.2.parent(parent) return output } - func pardo<Param:Codable,O0,O1,O2>(name:String,_ param: Param, + func pstream<Param:Codable,O0,O1,O2>(name:String,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) let parent = self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) @@ -162,27 +148,17 @@ public extension PCollection { } // Three Outputs Generated Names - func pardo<F:SerializableFn,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ fn: F, + func pstream<F:SerializableFn,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>,_ o2:PCollection<O2>) { - self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) + pstream(name:"\(_file):\(_line)",fn,o0,o1,o2) } - func pardo<O0,O1,O2>(_file:String=#fileID,_line:Int=#line, + func pstream<O0,O1,O2>(_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { - let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) - let parent = self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) - output.0.parent(parent) - output.1.parent(parent) - output.2.parent(parent) - return output + pstream(name:"\(_file):\(_line)",fn) } - func pardo<Param:Codable,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ param: Param, + func pstream<Param:Codable,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { - let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) - let parent = self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) - output.0.parent(parent) - output.1.parent(parent) - output.2.parent(parent) - return output + pstream(name:"\(_file):\(_line)",param,fn) } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift index fc2aee794ec..7abd847b8b5 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift @@ -19,7 +19,7 @@ /// Basic reducers public extension PCollection { func reduce<Result:Codable,K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,into:Result,_ accumulator: @Sendable @escaping (V,inout Result) -> Void) -> PCollection<KV<K,Result>> where Of == KV<K,V> { - return pardo(name:name ?? "\(_file):\(_line)",into) { initialValue,input,output in + return pstream(name:name ?? "\(_file):\(_line)",into) { initialValue,input,output in for await (kv,ts,w) in input { var result = initialValue for v in kv.values { diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift b/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift index 444f8e40320..3ce56a709b2 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift @@ -36,12 +36,12 @@ struct ListFilesResponse: Codable { public struct GoogleStorage : FileIOSource { public static func readFiles(matching: PCollection<KV<String,String>>) -> PCollection<Data> { - matching.pardo { matching,output in + matching.pstream { matching,output in guard let tokenProvider = DefaultTokenProvider(scopes: ["storage.objects.get"]) else { throw ApacheBeamError.runtimeError("Unable to get OAuth2 token.") } let connection = Connection(provider: tokenProvider) - for await (file,_,_) in matching { + for await (file,ts,w) in matching { let bucket = file.key for name in file.values { let url = "https://storage.googleapis.com/storage/v1/b/\(bucket)/o/\(name.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)" @@ -62,7 +62,7 @@ public struct GoogleStorage : FileIOSource { } } if let d = response { - output.emit(d) + output.emit(d,timestamp:ts,window:w) } } } @@ -70,13 +70,13 @@ public struct GoogleStorage : FileIOSource { } public static func listFiles(matching: PCollection<KV<String,String>>) -> PCollection<KV<String,String>> { - matching.pardo { matching,output in + matching.pstream { matching,output in guard let tokenProvider = DefaultTokenProvider(scopes: ["storage.objects.list"]) else { throw ApacheBeamError.runtimeError("Unable to get OAuth2 token.") } let connection = Connection(provider: tokenProvider) - for await (match,_,_) in matching { + for await (match,ts,w) in matching { let bucket = match.key for prefix in match.values { let response:Data? = try await withCheckedThrowingContinuation { continuation in @@ -100,7 +100,7 @@ public struct GoogleStorage : FileIOSource { let listfiles = try JSONDecoder().decode(ListFilesResponse.self, from: data) for item in listfiles.items { if item.size != "0" { - output.emit(KV(item.bucket,item.name)) + output.emit(KV(item.bucket,item.name),timestamp: ts,window: w) } } } diff --git a/sdks/swift/Sources/Examples/Wordcount/Wordcount.swift b/sdks/swift/Sources/Examples/Wordcount/Wordcount.swift new file mode 100644 index 00000000000..3778545046f --- /dev/null +++ b/sdks/swift/Sources/Examples/Wordcount/Wordcount.swift @@ -0,0 +1,2 @@ +import ApacheBeam + diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift index bddf3abc47d..dd2d184ac71 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift @@ -19,20 +19,20 @@ public struct FixtureWordCount : PTransform { public var expand: some PTransform { let (contents,errors) = create(self.fixtures) - .pardo(name:"Read Files") { (filenames,output:PCollectionStream<String>,errors:PCollectionStream<String>) in - for await (filename,_,_) in filenames { + .pstream(name:"Read Files") { (filenames,output:PCollectionStream<String>,errors:PCollectionStream<String>) in + for await (filename,ts,w) in filenames { do { - output.emit(String(decoding:try fixtureData(filename),as:UTF8.self)) + output.emit(String(decoding:try fixtureData(filename),as:UTF8.self),timestamp:ts,window:w) } catch { - errors.emit("Unable to read \(filename): \(error)") + errors.emit("Unable to read \(filename): \(error)",timestamp:ts,window:w) } } } - let baseCount = contents.pardo { (contents,lines:PCollectionStream<String>) in - for await (content,_,_) in contents { + let baseCount = contents.pstream { (contents,lines:PCollectionStream<String>) in + for await (content,ts,w) in contents { content.enumerateLines { line,_ in - lines.emit(line) + lines.emit(line,timestamp:ts,window:w) } } } diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift index 1806de4c098..b4ffa125b08 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift @@ -54,10 +54,10 @@ final class FileIOTests: XCTestCase { .readFiles(in: GoogleStorage.self) // Simple ParDo that takes advantage of enumerateLines. No name to test name generation of pardos - let lines = contents.pardo { contents,lines in - for await (content,_,_) in contents { + let lines = contents.pstream { contents,lines in + for await (content,ts,w) in contents { String(data:content,encoding:.utf8)!.enumerateLines { line,_ in - lines.emit(line) + lines.emit(line,timestamp:ts,window:w) } } } diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift index b86b941cd7f..27e5972c1e7 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift @@ -49,21 +49,21 @@ final class IntegrationTests: XCTestCase { try await Pipeline { pipeline in let (contents,errors) = pipeline .create(["file1.txt","file2.txt","missing.txt"]) - .pardo(name:"Read Files") { filenames,output,errors in - for await (filename,_,_) in filenames { + .pstream(name:"Read Files") { filenames,output,errors in + for await (filename,ts,w) in filenames { do { - output.emit(String(decoding:try fixtureData(filename),as:UTF8.self)) + output.emit(String(decoding:try fixtureData(filename),as:UTF8.self),timestamp:ts,window:w) } catch { - errors.emit("Unable to read \(filename): \(error)") + errors.emit("Unable to read \(filename): \(error)",timestamp:ts,window:w) } } } // Simple ParDo that takes advantage of enumerateLines. No name to test name generation of pardos - let lines = contents.pardo { contents,lines in - for await (content,_,_) in contents { + let lines = contents.pstream { contents,lines in + for await (content,ts,w) in contents { content.enumerateLines { line,_ in - lines.emit(line) + lines.emit(line,timestamp:ts,window:w) } } }