This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit c5edeac128926a905dbd8abe0461d8aad172b7d9 Author: Byron Ellis <byronel...@google.com> AuthorDate: Mon Aug 21 13:20:18 2023 -0700 Start stubbing out more of the composite transform implementation. Start stubbing out External Transforms and Schema support. Improve automatic naming of pardos and make explicit name implementation cleaner. Add S3 and GCS dependencies to implement basic internal IOs. --- sdks/swift/Package.resolved | 63 +++++++++++ sdks/swift/Package.swift | 5 +- sdks/swift/Sources/ApacheBeam/Coders/Coder.swift | 2 +- .../Core/PTransform/OutputPTransform.swift | 10 ++ .../Core/PTransform/PTransformBuilder.swift | 20 ++++ .../Core/PTransform/TuplePTransform.swift | 20 ++++ .../ApacheBeam/Core/Pipeline/Pipeline.swift | 2 + .../Core/Pipeline/PipelineTransform.swift | 1 + .../DynamicRow.swift} | 18 +--- .../Sources/ApacheBeam/Schema/RowProtocol.swift | 3 + sdks/swift/Sources/ApacheBeam/Schema/Schema.swift | 21 ++++ .../Sources/ApacheBeam/Transforms/Basic.swift | 21 ++-- .../Sources/ApacheBeam/Transforms/BuiltIn.swift | 118 ++++++++++++++++----- .../Sources/ApacheBeam/Transforms/Combining.swift | 2 +- .../Composite.swift} | 15 +-- .../IO/ListFiles.swift} | 22 ++-- .../Pipeline/IntegrationTests.swift | 4 +- 17 files changed, 265 insertions(+), 82 deletions(-) diff --git a/sdks/swift/Package.resolved b/sdks/swift/Package.resolved index cc4a106ff4a..7bf7847cda6 100644 --- a/sdks/swift/Package.resolved +++ b/sdks/swift/Package.resolved @@ -1,5 +1,50 @@ { "pins" : [ + { + "identity" : "aws-crt-swift", + "kind" : "remoteSourceControl", + "location" : "https://github.com/awslabs/aws-crt-swift", + "state" : { + "revision" : "997904873945e074aaf5c51ea968d9a84684525a", + "version" : "0.13.0" + } + }, + { + "identity" : "aws-sdk-swift", + "kind" : "remoteSourceControl", + "location" : "https://github.com/awslabs/aws-sdk-swift.git", + "state" : { + "revision" : "14cfdb636d918e589f1b053e43acc118e75af5ce", + "version" : "0.23.0" + } + }, + { + "identity" : "bigint", + "kind" : "remoteSourceControl", + "location" : "https://github.com/attaswift/BigInt", + "state" : { + "revision" : "0ed110f7555c34ff468e72e1686e59721f2b0da6", + "version" : "5.3.0" + } + }, + { + "identity" : "cryptoswift", + "kind" : "remoteSourceControl", + "location" : "https://github.com/krzyzanowskim/CryptoSwift.git", + "state" : { + "revision" : "af1b58fc569bfde777462349b9f7314b61762be0", + "version" : "1.3.2" + } + }, + { + "identity" : "google-auth-library-swift", + "kind" : "remoteSourceControl", + "location" : "https://github.com/googleapis/google-auth-library-swift", + "state" : { + "revision" : "4b510d91fc74f1415eae6dabc9836b8c3e1f44f6", + "version" : "0.5.3" + } + }, { "identity" : "grpc-swift", "kind" : "remoteSourceControl", @@ -9,6 +54,15 @@ "version" : "1.19.0" } }, + { + "identity" : "smithy-swift", + "kind" : "remoteSourceControl", + "location" : "https://github.com/awslabs/smithy-swift", + "state" : { + "revision" : "f277bd61b9a869a6a4a4b4a0e808240147713f7f", + "version" : "0.27.0" + } + }, { "identity" : "swift-atomics", "kind" : "remoteSourceControl", @@ -107,6 +161,15 @@ "revision" : "ce20dc083ee485524b802669890291c0d8090170", "version" : "1.22.1" } + }, + { + "identity" : "xmlcoder", + "kind" : "remoteSourceControl", + "location" : "https://github.com/MaxDesiatov/XMLCoder.git", + "state" : { + "revision" : "80b4a1646399b8e4e0ce80711653476a85bd5e37", + "version" : "0.17.0" + } } ], "version" : 2 diff --git a/sdks/swift/Package.swift b/sdks/swift/Package.swift index e6b67948631..c5ca4db14eb 100644 --- a/sdks/swift/Package.swift +++ b/sdks/swift/Package.swift @@ -27,6 +27,8 @@ let dependencies: [Package.Dependency] = [ .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"), // Additional Transform Dependencies + .package(url: "https://github.com/awslabs/aws-sdk-swift.git", from: "0.23.0"), + .package(url: "https://github.com/googleapis/google-auth-library-swift",from:"0.0.0"), // Swift Package Manager Plugins @@ -52,7 +54,8 @@ let package = Package( name: "ApacheBeam", dependencies: [ .product(name:"GRPC",package:"grpc-swift"), - .product(name: "Logging",package:"swift-log") + .product(name: "Logging",package:"swift-log"), + .product(name: "AWSS3",package:"aws-sdk-swift") ] ), .testTarget( diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift index bf1ead4f6d8..13ed10ee6fa 100644 --- a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift +++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift @@ -32,8 +32,8 @@ public indirect enum Coder { case lengthprefix(Coder) case windowedvalue(Coder,Coder) - // TODO: Row Coder + } diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift new file mode 100644 index 00000000000..c9cb0b5c228 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift @@ -0,0 +1,10 @@ +public struct OutputPTransform<Of> : _PrimitivePTransform { + let name: String + let applyClosure: (Any) -> PCollection<Of> + + public init<In>(_ name:String,_ fn: @escaping (PCollection<In>) -> PCollection<Of>) { + self.name = name + self.applyClosure = { fn($0 as! PCollection<In>) } + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift new file mode 100644 index 00000000000..a1e2185c849 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift @@ -0,0 +1,20 @@ +public struct EmptyPTransform : _PrimitivePTransform { + +} + + + + +@resultBuilder +public struct PTransformBuilder { + + public static func buildBlock() -> EmptyPTransform { + EmptyPTransform() + } + + public static func buildBlock<Transform>(_ transform: Transform) -> Transform where Transform: PTransform { + transform + } + + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/TuplePTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/TuplePTransform.swift new file mode 100644 index 00000000000..49a26eefbbd --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/TuplePTransform.swift @@ -0,0 +1,20 @@ +public struct TuplePTransform<T>: _PrimitivePTransform { + public let value: T + let _children: [AnyPTransform] + + public init(value: T){ + self.value = value + _children = [] + } + + public init(value: T,children: [AnyPTransform]){ + self.value = value + _children = children + } + + init<T0:PTransform,T1:PTransform>(_ t0: T0,_ t1:T1) where T == (T0,T1) { + value = (t0,t1) + _children = [AnyPTransform(t0),AnyPTransform(t1)] + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift index b89025b7ec1..180710aae73 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift @@ -205,6 +205,8 @@ public final class Pipeline { toVisit.append(.collection(o)) case .flatten(_, _): throw ApacheBeamError.runtimeError("flatten not implemented yet") + case .external: + throw ApacheBeamError.runtimeError("External Transforms not implemented yet") case .groupByKey(let o): let outputs = [o].enumerated().map { ("\($0)",collection(from: $1).name) diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift index 550df2fb860..ce164566db1 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift @@ -27,6 +27,7 @@ public enum PipelineTransform { case groupByKey(AnyPCollection) case custom(String,Data,Environment?,[AnyPCollection]) case composite(AnyPTransform) + case external } diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift b/sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift similarity index 63% copy from sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift copy to sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift index 550df2fb860..b0a4054d2e5 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift +++ b/sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift @@ -16,18 +16,8 @@ * limitations under the License. */ -import Foundation - -/// Enum for pipeline representable transforms as opposed to composite transforms -/// which are a user-side construct represented by PTransform -public enum PipelineTransform { - case pardo(String,SerializableFn,[AnyPCollection]) - case impulse(AnyPCollection) - case flatten([AnyPCollection],AnyPCollection) - case groupByKey(AnyPCollection) - case custom(String,Data,Environment?,[AnyPCollection]) - case composite(AnyPTransform) +/// Dynamic representation of a Row that lets us treat a row value like JSON. Obviously this is less performant and +/// when using schema objects internally, particularly in PCollections we would favor @Row structs +public struct DynamicRow { + } - - - diff --git a/sdks/swift/Sources/ApacheBeam/Schema/RowProtocol.swift b/sdks/swift/Sources/ApacheBeam/Schema/RowProtocol.swift new file mode 100644 index 00000000000..b1315fd09de --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Schema/RowProtocol.swift @@ -0,0 +1,3 @@ +public protocol RowProtocol { + +} diff --git a/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift b/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift new file mode 100644 index 00000000000..1404f5e8efd --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift @@ -0,0 +1,21 @@ + +public indirect enum FieldType { + case byte,int16,int32,int64,float,double,string,datetime,boolean,bytes + case decimal(Int,Int) + + case logical(Schema) + case row(Schema) + + case array(FieldType) + case repeated(FieldType) + case map(FieldType,FieldType) +} + +public struct Field { + let name: String + let type: FieldType +} + +public struct Schema { + +} diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift index 9be08330b46..be7b3318c03 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift @@ -20,9 +20,8 @@ public extension PCollection { /// Each time the input fires output all of the values in this list. - func create<Value:Codable>(_ values: [Value],_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> { - - return pardo(name,_file:_file,_line:_line,values) { values,input,output in + func create<Value:Codable>(_ values: [Value],name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> { + return pardo(name:name ?? "\(_file):\(_line)",values) { values,input,output in for try await (_,ts,w) in input { for v in values { output.emit(v,timestamp:ts,window:w) @@ -37,7 +36,7 @@ public extension PCollection { @discardableResult func log(prefix:String,_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Of> where Of == String { - pardo(name,_file:_file,_line:_line,prefix) { prefix,input,output in + pardo(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output in for await element in input { print("\(prefix): \(element)") output.emit(element) @@ -47,7 +46,7 @@ public extension PCollection { @discardableResult func log<K,V>(prefix:String,_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<KV<K,V>> where Of == KV<K,V> { - pardo(name,_file:_file,_line:_line,prefix) { prefix,input,output in + pardo(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output in for await element in input { let kv = element.0 for v in kv.values { @@ -63,8 +62,8 @@ public extension PCollection { public extension PCollection { /// Modify a value without changing its window or timestamp - func map<Out>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> { - return pardo(name,_file:_file,_line:_line) { input,output in + func map<Out>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> { + return pardo(name:name ?? "\(_file):\(_line)") { input,output in for try await (v,ts,w) in input { output.emit(fn(v),timestamp:ts,window:w) } @@ -72,7 +71,7 @@ public extension PCollection { } func map<K,V>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { - return pardo(name,_file:_file,_line:_line) { input,output in + return pardo(name:name ?? "\(_file):\(_line)") { input,output in for try await (i,ts,w) in input { let (key,value) = fn(i) output.emit(KV(key,value),timestamp:ts,window:w) @@ -82,7 +81,7 @@ public extension PCollection { /// Produce multiple outputs as a single value without modifying window or timestamp func flatMap<Out>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection<Out> { - return pardo(name,_file:_file,_line:_line) { input,output in + return pardo(name:name ?? "\(_file):\(_line)") { input,output in for try await (v,ts,w) in input { for i in fn(v) { output.emit(i,timestamp:ts,window:w) @@ -95,7 +94,7 @@ public extension PCollection { public extension PCollection<Never> { /// Convenience function to add an impulse when we are at the root of the pipeline - func create<Value:Codable>(_ values: [Value],_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> { - return impulse().create(values,name,_file:_file,_line:_line) + func create<Value:Codable>(_ values: [Value],name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> { + return impulse().create(values,name:name,_file:_file,_line:_line) } } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift index 4cc3532da21..7bea2c4f0c8 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift @@ -33,71 +33,137 @@ public extension PCollection { // TODO: Replace with parameter pack version once https://github.com/apple/swift/issues/67192 is resolved // No Output - func pardo<F:SerializableFn>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F) { - self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [])) + func pardo<F:SerializableFn>(name:String,_ fn: F) { + self.apply(.pardo(name, fn, [])) } - func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { - self.apply(.pardo(name ?? "\(_file):\(_line)", ClosureFn(fn),[])) + func pardo(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { + self.apply(.pardo(name, ClosureFn(fn),[])) } - func pardo<Param:Codable>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { - self.apply(.pardo(name ?? "\(_file):\(_line)", ParameterizedClosureFn(param,fn), [])) + func pardo<Param:Codable>(name:String,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { + self.apply(.pardo(name, ParameterizedClosureFn(param,fn), [])) + } + + // No Output Generated Names + func pardo<F:SerializableFn>(_file:String=#fileID,_line:Int=#line,_ fn: F) { + self.apply(.pardo("\(_file):\(_line)", fn, [])) + } + func pardo(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { + self.apply(.pardo("\(_file):\(_line)", ClosureFn(fn),[])) + } + func pardo<Param:Codable>(_file:String=#fileID,_line:Int=#line,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { + self.apply(.pardo("\(_file):\(_line)", ParameterizedClosureFn(param,fn), [])) } // Single Output - func pardo<F:SerializableFn,O0>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F, + func pardo<F:SerializableFn,O0>(name:String,fn: F, + _ o0:PCollection<O0>) { + self.apply(.pardo(name, fn, [AnyPCollection(o0)])) + } + func pardo<O0>(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { + let output = PCollection<O0>() + self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output)])) + return output + } + func pardo<Param:Codable,O0>(name:String,_ param: Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { + let output = PCollection<O0>() + self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) + return output + } + + // Single Output Generated Names + func pardo<F:SerializableFn,O0>(_file:String=#fileID,_line:Int=#line,fn: F, _ o0:PCollection<O0>) { - self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0)])) + self.apply(.pardo("\(_file):\(_line)", fn, [AnyPCollection(o0)])) } - func pardo<O0>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line, - _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { + func pardo<O0>(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { let output = PCollection<O0>() - self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)])) + self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)])) return output } - func pardo<Param:Codable,O0>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param, - _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { + func pardo<Param:Codable,O0>(_file:String=#fileID,_line:Int=#line,_ param: Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { let output = PCollection<O0>() - self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) + self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) return output } + + // Two Outputs - func pardo<F:SerializableFn,O0,O1>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F, + func pardo<F:SerializableFn,O0,O1>(name:String,_ fn: F, + _ o0:PCollection<O0>,_ o1:PCollection<O1>) { + self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1)])) + } + func pardo<O0,O1>(name:String, + _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { + let output = (PCollection<O0>(),PCollection<O1>()) + self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + return output + } + func pardo<Param:Codable,O0,O1>(name:String,_ param: Param, + _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { + let output = (PCollection<O0>(),PCollection<O1>()) + self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + return output + } + + // Two Outputs Generated Names + func pardo<F:SerializableFn,O0,O1>(_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>) { - self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1)])) + self.apply(.pardo("\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1)])) } - func pardo<O0,O1>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line, + func pardo<O0,O1>(_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { let output = (PCollection<O0>(),PCollection<O1>()) - self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) return output } - func pardo<Param:Codable,O0,O1>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param, + func pardo<Param:Codable,O0,O1>(_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { let output = (PCollection<O0>(),PCollection<O1>()) - self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) return output } + + // Three Outputs - func pardo<F:SerializableFn,O0,O1,O2>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F, + func pardo<F:SerializableFn,O0,O1,O2>(name:String,_ fn: F, + _ o0:PCollection<O0>,_ o1:PCollection<O1>,_ o2:PCollection<O2>) { + self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) + } + func pardo<O0,O1,O2>(name:String, + _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { + let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) + self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + return output + } + func pardo<Param:Codable,O0,O1,O2>(name:String,_ param: Param, + _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { + let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) + self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + return output + } + + // Three Outputs Generated Names + func pardo<F:SerializableFn,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>,_ o2:PCollection<O2>) { - self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) + self.apply(.pardo("\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) } - func pardo<O0,O1,O2>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line, + func pardo<O0,O1,O2>(_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) - self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) return output } - func pardo<Param:Codable,O0,O1,O2>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param, + func pardo<Param:Codable,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) - self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) return output } + //TODO: Add more as needed } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift index 3ef57ebfd35..fc2aee794ec 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift @@ -19,7 +19,7 @@ /// Basic reducers public extension PCollection { func reduce<Result:Codable,K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,into:Result,_ accumulator: @Sendable @escaping (V,inout Result) -> Void) -> PCollection<KV<K,Result>> where Of == KV<K,V> { - return pardo(name,_file:_file,_line:_line,into) { initialValue,input,output in + return pardo(name:name ?? "\(_file):\(_line)",into) { initialValue,input,output in for await (kv,ts,w) in input { var result = initialValue for v in kv.values { diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Composite.swift similarity index 63% copy from sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift copy to sdks/swift/Sources/ApacheBeam/Transforms/Composite.swift index 550df2fb860..9155d6ff9e5 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Composite.swift @@ -16,18 +16,5 @@ * limitations under the License. */ -import Foundation - -/// Enum for pipeline representable transforms as opposed to composite transforms -/// which are a user-side construct represented by PTransform -public enum PipelineTransform { - case pardo(String,SerializableFn,[AnyPCollection]) - case impulse(AnyPCollection) - case flatten([AnyPCollection],AnyPCollection) - case groupByKey(AnyPCollection) - case custom(String,Data,Environment?,[AnyPCollection]) - case composite(AnyPTransform) +public extension PCollection { } - - - diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift b/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift similarity index 63% copy from sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift copy to sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift index 550df2fb860..2c6b79b6ee1 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift @@ -16,18 +16,16 @@ * limitations under the License. */ -import Foundation - -/// Enum for pipeline representable transforms as opposed to composite transforms -/// which are a user-side construct represented by PTransform -public enum PipelineTransform { - case pardo(String,SerializableFn,[AnyPCollection]) - case impulse(AnyPCollection) - case flatten([AnyPCollection],AnyPCollection) - case groupByKey(AnyPCollection) - case custom(String,Data,Environment?,[AnyPCollection]) - case composite(AnyPTransform) +struct S3Bucket : Codable { + let bucket:String } +struct GSBucket : Codable { + let bucket:String +} - +public extension PCollection { + + + +} diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift index 58c2baa479c..0c34fcb42ae 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift @@ -48,7 +48,7 @@ final class IntegrationTests: XCTestCase { try await Pipeline { pipeline in let (contents,errors) = pipeline .create(["file1.txt","file2.txt","missing.txt"]) - .pardo { filenames,output,errors in + .pardo(name:"Read Files") { filenames,output,errors in for await (filename,_,_) in filenames { do { output.emit(String(decoding:try fixtureData(filename),as:UTF8.self)) @@ -58,7 +58,7 @@ final class IntegrationTests: XCTestCase { } } - // Simple ParDo that takes advantage of enumerateLines + // Simple ParDo that takes advantage of enumerateLines. No name to test name generation of pardos let lines = contents.pardo { contents,lines in for await (content,_,_) in contents { content.enumerateLines { line,_ in