This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5be3750fd8f5294f98867c44c8339dcd192837d8 Author: Byron Ellis <byronel...@google.com> AuthorDate: Mon Aug 14 07:49:04 2023 -0700 Refactor Unary and Tuple SerializableFn into a single ClosureFn since one is just a special case of the other. Add the wordcount integration test to verify the refactoring does indeed work and start moving the pipeline code into the branch. --- sdks/python/output.json-00000-of-00001 | 2 + sdks/swift/Documentation/INTERNALS.md | 4 - .../Sources/ApacheBeam/Coders/BeamValue.swift | 2 + .../swift/Sources/ApacheBeam/Coders/Beamable.swift | 24 ++++++ .../Sources/ApacheBeam/Coders/Coder+Decoding.swift | 1 + sdks/swift/Sources/ApacheBeam/Coders/Coder.swift | 31 ++++++- .../ApacheBeam/Core/DynamicProperties.swift | 36 ++++++++ .../Sources/ApacheBeam/Core/Fn/ClosureFn.swift | 63 ++++++++++++++ sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift | 15 ++++ sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift | 63 -------------- .../Core/Fn/ParameterizedClosureFn.swift | 74 ++++++++++++++++ .../{SerialiableFn.swift => SerializableFn.swift} | 4 +- .../Core/PCollection/AnyPCollection.swift | 51 +++++++++++ .../Core/PCollection/AnyPCollectionStream.swift | 17 ++-- .../ApacheBeam/Core/PCollection/PCollection.swift | 32 +++++++ .../Core/PCollection/PCollectionStream.swift | 73 ++++++++++++++-- .../ApacheBeam/Core/PTransform/AnyPTransform.swift | 25 ++++++ .../ApacheBeam/Core/PTransform/PTransform.swift | 33 ++++++++ .../ApacheBeam/Core/Pipeline/Pipeline.swift | 14 ++++ .../Core/Pipeline/PipelineTransform.swift | 16 ++++ .../ApacheBeam/Runtime/Worker/WorkerProvider.swift | 2 +- .../Sources/ApacheBeam/Transforms/Basic.swift | 68 +++++++++++++++ .../Sources/ApacheBeam/Transforms/BuiltIn.swift | 98 ++++++++++++++++++++++ .../Sources/ApacheBeam/Transforms/Combining.swift | 22 +++++ .../Sources/ApacheBeam/Transforms/Grouping.swift | 8 ++ .../Documentation.docc/ApacheBeam.md | 37 ++++++++ .../Tests/ApacheBeamTests/Coders/CoderTests.swift | 9 ++ .../Pipeline/IntegrationTests.swift | 76 +++++++++++++++++ 28 files changed, 815 insertions(+), 85 deletions(-) diff --git a/sdks/python/output.json-00000-of-00001 b/sdks/python/output.json-00000-of-00001 new file mode 100644 index 00000000000..8b56471c52f --- /dev/null +++ b/sdks/python/output.json-00000-of-00001 @@ -0,0 +1,2 @@ +{"col1":"bar","col2":2,"col3":200} +{"col1":"baz","col2":3,"col3":300} diff --git a/sdks/swift/Documentation/INTERNALS.md b/sdks/swift/Documentation/INTERNALS.md deleted file mode 100644 index 757e1b44602..00000000000 --- a/sdks/swift/Documentation/INTERNALS.md +++ /dev/null @@ -1,4 +0,0 @@ -# Internals - - - diff --git a/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift b/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift index 054356ed6f1..b7ed89c4668 100644 --- a/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift +++ b/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift @@ -37,6 +37,8 @@ public indirect enum BeamValue { /// A window case window(Window) + //TODO: Custom Values and Row Values + // Composite Values /// An iterable diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Beamable.swift b/sdks/swift/Sources/ApacheBeam/Coders/Beamable.swift new file mode 100644 index 00000000000..239c248104c --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Coders/Beamable.swift @@ -0,0 +1,24 @@ + +import Foundation + +/// Codable is already taken and besides Beamable is too good to pass up +public protocol Beamable { + static var coder: Coder { get } +} + +extension Data : Beamable { + public static let coder:Coder = .bytes +} + +extension String : Beamable { + public static let coder:Coder = .string +} + +extension Int : Beamable { + public static let coder:Coder = .varint +} + +extension Bool : Beamable { + public static let coder:Coder = .boolean +} + diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift index 48c1cb9db43..31b97f0d318 100644 --- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift +++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift @@ -17,6 +17,7 @@ */ import Foundation +/// This extension contains all of the decoding implementation. File separation is for clarity. public extension Coder { /// Decodes a raw data block into a BeamValue for further processing diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift index e8c23b9d4cf..ceb2649243e 100644 --- a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift +++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift @@ -18,20 +18,21 @@ import Foundation public indirect enum Coder { - // Special + /// Catch-all for coders we don't understand. Mostly used for error reporting case unknown(String) + // TODO: Actually implement this case custom(Data) - // Scalar standard coders + /// Standard scalar coders. Does not necessarily correspond 1:1 with BeamValue. For example, varint and fixedint both map to integer case double,varint,fixedint,byte,bytes,string,boolean,globalwindow - // Composite standard coders - + /// Composite coders. case keyvalue(Coder,Coder) case iterable(Coder) case lengthprefix(Coder) case windowedvalue(Coder,Coder) + // TODO: Row Coder } @@ -71,6 +72,7 @@ public extension Coder { } } + /// Static list of coders for use in capabilities arrays in environments. static let capabilities:[String] = ["byte","bytes","bool","varint","double","integer","string_utf8","length_prefix","kv","iterable","windowed_value","global_window"] .map({ .coderUrn($0) }) } @@ -187,4 +189,25 @@ extension Coder { } } +extension Coder { + + + + public static func of<Of>(type: Optional<Of>.Type) -> Coder? { + return .lengthprefix(.of(type: Of.self)!) + } + + public static func of<Of>(type: Array<Of>.Type) -> Coder? { + return .iterable(.of(type: Of.self)!) + } + + public static func of<Of>(type: Of.Type) -> Coder? { + // Beamables provider their own default coder implementation + if let beamable = Of.self as? Beamable.Type { + return beamable.coder + } + return nil + } +} + diff --git a/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift b/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift new file mode 100644 index 00000000000..ee3c61a65c5 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift @@ -0,0 +1,36 @@ +import Logging + +public protocol DynamicProperty { } + +@propertyWrapper +public struct PInput<Of> : DynamicProperty { + public var wrappedValue: PCollectionStream<Of> + + public init(wrappedValue: PCollectionStream<Of> = .init()) { + self.wrappedValue = wrappedValue + } +} + +@propertyWrapper +public struct POutput<Of> : DynamicProperty { + public var wrappedValue: PCollectionStream<Of> + public init(wrappedValue: PCollectionStream<Of> = .init()) { + self.wrappedValue = wrappedValue + } +} + +@propertyWrapper +public struct Logger : DynamicProperty { + public var wrappedValue: Logging.Logger + public init(wrappedValue: Logging.Logger = Logging.Logger(label: "TEST")) { + self.wrappedValue = wrappedValue + } +} + +@propertyWrapper +public struct Serialized<Value:Codable> : DynamicProperty { + public var wrappedValue: Value? + public init(wrappedValue: Value? = nil) { + self.wrappedValue = wrappedValue + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift new file mode 100644 index 00000000000..fcf43a91215 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift @@ -0,0 +1,63 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import Foundation + +/// A SerializableFn that holds a reference to a function that takes a single input and produces a variable number of outputs +public final class ClosureFn : SerializableFn { + let processClosure: (SerializableFnBundleContext,[AnyPCollectionStream],[AnyPCollectionStream]) async throws -> Void + + //TODO: Replace this with a parameter pack version once I figure out how to do that + + public init<Of>(_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { + self.processClosure = { context,inputs,outputs in + try await fn(inputs[0].stream()) + } + } + + public init<Of,O0>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) { + self.processClosure = { context,inputs,outputs in + try await fn(inputs[0].stream(),outputs[0].stream()) + } + } + + public init<Of,O0,O1>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) { + self.processClosure = { context,inputs,outputs in + try await fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream()) + } + } + + public init<Of,O0,O1,O2>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) { + self.processClosure = { context,inputs,outputs in + try await fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream()) + } + } + + public init<Of,O0,O1,O2,O3>(_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream,PCollection<O3>.Stream) async throws -> Void) { + self.processClosure = { context,inputs,outputs in + try await fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream(),outputs[3].stream()) + } + } + + public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) { + try await processClosure(context,inputs,outputs) + outputs.finish() + return (context.instruction,context.transform) + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift new file mode 100644 index 00000000000..58d1c211c81 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift @@ -0,0 +1,15 @@ + +import Logging + +/// A higher level interface to SerializableFn using dependency injected dynamic properties in the same +/// way as we define Composite PTransforms +public protocol DoFn { + func process() async throws + func finishBundle() async throws +} + +public extension DoFn { + func finishBundle() async throws { } +} + + diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift deleted file mode 100644 index 82fd4708046..00000000000 --- a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift +++ /dev/null @@ -1,63 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -import Foundation - -public final class DoneFn<Of> : SerializableFn { - - private let fn: (PCollectionStream<Of>) async throws -> Void - public init(_ fn: @Sendable @escaping (PCollectionStream<Of>) async throws -> Void) { - self.fn = fn - } - - - public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) { - try await fn(inputs[0].stream()) - for output in outputs { - output.finish() - } - return (context.instruction,context.transform) - } -} - -public final class ParameterizedDoneFn<Of,Param:Codable> : SerializableFn { - - - private let param: Param - private let fn: (Param,PCollectionStream<Of>) async throws -> Void - - public init(_ param: Param,_ fn: @Sendable @escaping (Param,PCollectionStream<Of>) async throws -> Void){ - self.param = param - self.fn = fn - } - - public var payload: Data { - get throws { - try JSONEncoder().encode(param) - } - } - - public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) { - try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream()) - for output in outputs { - output.finish() - } - return (context.instruction,context.transform) - } - -} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/ParameterizedClosureFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/ParameterizedClosureFn.swift new file mode 100644 index 00000000000..575886d9f8c --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/ParameterizedClosureFn.swift @@ -0,0 +1,74 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import Foundation + +/// A SerializableFn that holds a reference to a function that takes a single input and produces a variable number of outputs +public final class ParameterizedClosureFn<Param:Codable> : SerializableFn { + let param: Param + let processClosure: (SerializableFnBundleContext,[AnyPCollectionStream],[AnyPCollectionStream]) async throws -> Void + + //TODO: Replace this with a parameter pack version once I figure out how to do that + + public init<Of>(_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { + self.param = param + self.processClosure = { context,inputs,outputs in + try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream()) + } + } + + public init<Of,O0>(_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) { + self.param = param + self.processClosure = { context,inputs,outputs in + try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream(),outputs[0].stream()) + } + } + + public init<Of,O0,O1>(_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) { + self.param = param + self.processClosure = { context,inputs,outputs in + try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream(),outputs[0].stream(),outputs[1].stream()) + } + } + + public init<Of,O0,O1,O2>(_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) { + self.param = param + self.processClosure = { context,inputs,outputs in + try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream()) + } + } + + public init<Of,O0,O1,O2,O3>(_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream,PCollection<O3>.Stream) async throws -> Void) { + self.param = param + self.processClosure = { context,inputs,outputs in + try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream(),outputs[3].stream()) + } + } + + public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) { + try await processClosure(context,inputs,outputs) + outputs.finish() + return (context.instruction,context.transform) + } + + public var payload: Data { + get throws { + try JSONEncoder().encode(param) + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift similarity index 86% rename from sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift rename to sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift index 3a30066dc4b..babf64dfe31 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift @@ -25,7 +25,8 @@ public struct SerializableFnBundleContext { let log:Logger } -/// SerialiableFn is a protocol for functions that should be parameterized for the pipeline +/// SerialiableFn is a protocol for functions that should be parameterized for the pipeline. This is intended as a fairly low level class and users +/// should interact with the apply() functions defined in the transform section or implement the DoFn protocol which is then wrapped public protocol SerializableFn { var payload: Data { get throws } func process(context:SerializableFnBundleContext,inputs:[AnyPCollectionStream],outputs:[AnyPCollectionStream]) async throws -> (String,String) @@ -35,4 +36,3 @@ public protocol SerializableFn { public extension SerializableFn { var payload : Data { Data() } } - diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift new file mode 100644 index 00000000000..8f23dccb954 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift @@ -0,0 +1,51 @@ +public struct AnyPCollection : PCollectionProtocol { + + + + let type: Any.Type + let ofType: Any.Type + let collection: Any + + let applyClosure: (Any,PipelineTransform) -> Void + let consumersClosure: (Any) -> [PipelineTransform] + let coderClosure: (Any) -> Coder + let streamClosure: (Any) -> AnyPCollectionStream + + public init<C>(_ collection: C) where C : PCollectionProtocol { + if let anyCollection = collection as? AnyPCollection { + self = anyCollection + } else { + self.type = C.self + self.ofType = C.Of.self + self.collection = collection + + self.applyClosure = { ($0 as! C).apply($1) } + self.consumersClosure = { ($0 as! C).consumers } + self.coderClosure = { ($0 as! C).coder } + self.streamClosure = { AnyPCollectionStream(($0 as! C).stream) } + } + } + + + public var consumers: [PipelineTransform] { + consumersClosure(collection) + } + + public func apply(_ transform: PipelineTransform) { + applyClosure(collection,transform) + } + + + public var coder: Coder { + coderClosure(collection) + } + + public var stream: PCollectionStream<Never> { + fatalError("Do not use `stream` on AnyPCollection. Use `anyStream` instead.") + } + + public var anyStream: AnyPCollectionStream { + streamClosure(collection) + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift index da15c740771..df11ecd3780 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift @@ -33,7 +33,7 @@ public struct AnyPCollectionStream : AsyncSequence { let value: Any let nextGenerator: (Any) -> (() async throws -> Iterator.Element?) - let emitClosure: (Any,Any) -> Void + let emitClosure: (Any,Any) throws -> Void let finishClosure: (Any) -> Void public func makeAsyncIterator() -> Iterator { @@ -50,12 +50,12 @@ public struct AnyPCollectionStream : AsyncSequence { self.emitClosure = { let stream = ($0 as! PCollectionStream<Of>) if let beamValue = $1 as? BeamValue { - stream.emit(beamValue) + try stream.emit(beamValue) } else if let element = $1 as? Element { stream.emit((element.0 as! Of,element.1,element.2)) } else if let element = $1 as? PCollectionStream<Of>.Element { stream.emit(element) - } + } } self.finishClosure = { @@ -81,6 +81,13 @@ public struct AnyPCollectionStream : AsyncSequence { public func finish() { finishClosure(self.value) } - - +} + +/// Convenience function of an array of AnyPCollectionStream elements to finish processing. +public extension Array where Array.Element == AnyPCollectionStream { + func finish() { + for stream in self { + stream.finish() + } + } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift new file mode 100644 index 00000000000..5fb52f7e7ea --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift @@ -0,0 +1,32 @@ +public protocol PCollectionProtocol { + associatedtype Of + + typealias Stream = PCollectionStream<Of> + + var consumers: [PipelineTransform] { get } + var coder: Coder { get } + var stream: Stream { get } + + func apply(_ transform: PipelineTransform) +} + + +public final class PCollection<Of> : PCollectionProtocol { + + public let coder: Coder + public var consumers: [PipelineTransform] + + public init(coder: Coder = .of(type: Of.self)!,consumers:[PipelineTransform] = []) { + self.coder = coder + self.consumers = consumers + } + + public var stream: PCollectionStream<Of> { + return PCollectionStream<Of>() + } + + public func apply(_ transform: PipelineTransform) { + consumers.append(transform) + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift index d54e2638230..1d2b3cbe151 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift @@ -26,10 +26,7 @@ public final class PCollectionStream<Of> : AsyncSequence { private let emitter: AsyncStream<Element>.Continuation public init() { - //Construct a stream, capturing the emit continuation - var tmp: AsyncStream<Element>.Continuation? - self.stream = AsyncStream<Element> { tmp = $0 } - self.emitter = tmp! + (self.stream,self.emitter) = AsyncStream.makeStream(of:Element.self) } public func makeAsyncIterator() -> AsyncStream<Element>.Iterator { @@ -49,8 +46,72 @@ public final class PCollectionStream<Of> : AsyncSequence { emit((value,timestamp,window)) } - public func emit(_ value: BeamValue) { - + // Implementing key-value pair conversion is a little more complicated because we need + // to convert to a KV<K,V> from what is essentially a KV<Any,Any> which requires us to + // cast the key and the value first and then construct the KV from that. There might be + // a more clever way of doing this, but I don't know what it is. + + func emit<K,V>(key: K,value: [V],timestamp: Date,window:Window) { + emit(KV(key,value) as! Of,timestamp:timestamp,window:window) + } + + func emit<K>(key: K,value: BeamValue,timestamp: Date,window:Window) throws { + //We overload the key value type as both (K,[V]) and (K,V). It may be worth considering + //having an explicit Pair type in addition to KV to simplify this decoding a little bit. + // + // On the other hand, the code is already written and pretty straightforward and there + // won't be much in the way of new scalar values. + if case .array(let array) = value { + switch array.first { + case .boolean(_):emit(key:key,value:array.map({$0.baseValue as! Bool}),timestamp:timestamp,window:window) + case .bytes(_): emit(key:key,value:array.map({$0.baseValue as! Data}),timestamp:timestamp,window:window) + case .double(_): emit(key:key,value:array.map({$0.baseValue as! Double}),timestamp:timestamp,window:window) + case .integer(_):emit(key:key,value:array.map({$0.baseValue as! Int}),timestamp:timestamp,window:window) + case .string(_): emit(key:key,value:array.map({$0.baseValue as! String}),timestamp:timestamp,window:window) + default: + throw ApacheBeamError.runtimeError("Can't use \(String(describing:array.first)) as a value in a key value pair") + } + } else { + switch value { + case let .boolean(v):emit(key:key,value:[v],timestamp:timestamp,window:window) + case let .bytes(v): emit(key:key,value:[v],timestamp:timestamp,window:window) + case let .double(v): emit(key:key,value:[v],timestamp:timestamp,window:window) + case let .integer(v):emit(key:key,value:[v],timestamp:timestamp,window:window) + case let .string(v): emit(key:key,value:[v],timestamp:timestamp,window:window) + default: + throw ApacheBeamError.runtimeError("Can't use \(value) as a value in a key value pair") + } + + } + } + + // Unwrap all of the actual value types (not windows or windowed elements) + func emit(_ value: BeamValue,timestamp: Date,window:Window) throws { + if case let .kv(key,value) = value { + // Unwrap the key first + switch key { + case let .boolean(v):try emit(key:v,value:value,timestamp:timestamp,window:window) + case let .bytes(v): try emit(key:v,value:value,timestamp:timestamp,window:window) + case let .double(v): try emit(key:v,value:value,timestamp:timestamp,window:window) + case let .integer(v):try emit(key:v,value:value,timestamp:timestamp,window:window) + case let .string(v): try emit(key:v,value:value,timestamp:timestamp,window:window) + default: + throw ApacheBeamError.runtimeError("Can't use \(value) as a value in a key value pair") + } + } else { + emit(value.baseValue as! Of,timestamp: timestamp,window: window) + } + } + + /// Mostly intended as a convenience function for bundle processing this emit unwraps a windowed value + /// for further conversion in (non-public) versions of the function. + public func emit(_ value: BeamValue) throws { + switch value { + case let .windowed(value, timestamp, _, window): + try emit(value,timestamp:timestamp,window:window.baseValue as! Window) + default: + throw ApacheBeamError.runtimeError("Only windowed values can be sent directly to a PCollectionStream, not \(value)") + } } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/AnyPTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/AnyPTransform.swift new file mode 100644 index 00000000000..0be179c8cdf --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/AnyPTransform.swift @@ -0,0 +1,25 @@ +public struct AnyPTransform : _PrimitivePTransform { + let type: Any.Type + var transform: Any + + let expandClosure: (Any) -> AnyPTransform + let expansionType: Any.Type + + + public init<T>(_ transform: T) where T: PTransform { + if let anyTransform = transform as? AnyPTransform { + self = anyTransform + } else { + self.type = T.self + self.expansionType = T.Expansion.self + self.transform = transform + self.expandClosure = { AnyPTransform(($0 as! T).expand) } + } + } +} + +extension AnyPTransform : ParentPTransform { + public var children: [AnyPTransform] { + (transform as? ParentPTransform)?.children ?? [] + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift new file mode 100644 index 00000000000..483a784c420 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift @@ -0,0 +1,33 @@ +/// Represents a composite transform +public protocol PTransform { + associatedtype Expansion: PTransform + + var expand: Expansion { get } +} + +public extension Never { + var expand: Never { + fatalError() + } +} + +extension Never : PTransform { } + +/// Represents PTransforms that can't be expanded further. When constructing the pipeline the expansion +/// happens until we hit this point +public protocol _PrimitivePTransform : PTransform where Expansion == Never { } +public extension _PrimitivePTransform { + var expand: Never { + neverExpand(String(reflecting: Self.self)) } +} + + +public protocol ParentPTransform { + var children: [AnyPTransform] { get } +} + +protocol GroupPTransform : ParentPTransform { } + +public func neverExpand(_ type: String) -> Never { + fatalError("\(type) is a primitive PTransform and cannot be expanded.") +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift new file mode 100644 index 00000000000..f918a620ea1 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift @@ -0,0 +1,14 @@ +import GRPC +import Logging + +public final class Pipeline { + let content: (inout PCollection<Never>) -> Void + let log: Logging.Logger + + public init(log: Logging.Logger = .init(label:"Pipeline"),_ content: @escaping (inout PCollection<Never>) -> Void) { + self.log = log + self.content = content + } + + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift new file mode 100644 index 00000000000..dea7daab738 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift @@ -0,0 +1,16 @@ + +import Foundation + +/// Enum for pipeline representable transforms as opposed to composite transforms +/// which are a user-side construct represented by PTransform +public enum PipelineTransform { + case pardo(String,SerializableFn,[AnyPCollection]) + case impulse(AnyPCollection) + case flatten([AnyPCollection],AnyPCollection) + case groupByKey(AnyPCollection) + case custom(String,Data,[AnyPCollection]) + case composite(AnyPTransform) +} + + + diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift index fb5e2e52225..e45c3f7eacb 100644 --- a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift @@ -20,7 +20,7 @@ import Logging actor WorkerProvider : Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorkerPoolAsyncProvider { - private let log = Logger(label:"WorkerProvider") + private let log = Logging.Logger(label: "Worker") private var workers: [String:Worker] = [:] private let functions: [String:SerializableFn] diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift new file mode 100644 index 00000000000..52f2b28aef1 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift @@ -0,0 +1,68 @@ + +/// Creating Static Values +public extension PCollection { + + /// Each time the input fires output all of the values in this list. + func create<Value:Codable>(_ values: [Value],_ name:String = "\(#file):\(#line)") -> PCollection<Value> { + return pardo(name,values) { values,input,output in + for try await (_,ts,w) in input { + for v in values { + output.emit(v,timestamp:ts,window:w) + } + } + } + } +} + +/// Convenience logging mappers +public extension PCollection { + func log(prefix:String,name:String = "\(#file):\(#line)") -> PCollection<Of> where Of == String { + pardo(name,prefix) { prefix,input,output in + for await element in input { + print("\(prefix): \(element)") + output.emit(element) + } + } + } +} + +/// Modifying Values +public extension PCollection { + + /// Modify a value without changing its window or timestamp + func map<Out>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> { + return pardo(name) { input,output in + for try await (v,ts,w) in input { + output.emit(fn(v),timestamp:ts,window:w) + } + } + } + + func map<K,V>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { + return pardo(name) { input,output in + for try await (i,ts,w) in input { + let (key,value) = fn(i) + output.emit(KV(key,value),timestamp:ts,window:w) + } + } + } + + /// Produce multiple outputs as a single value without modifying window or timestamp + func flatMap<Out>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection<Out> { + return pardo(name) { input,output in + for try await (v,ts,w) in input { + for i in fn(v) { + output.emit(i,timestamp:ts,window:w) + } + } + } + } + +} + +public extension PCollection<Never> { + /// Convenience function to add an impulse when we are at the root of the pipeline + func create<Value:Codable>(_ values: [Value],_ name:String = "\(#file):\(#line)") -> PCollection<Value> { + return impulse().create(values,name) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift new file mode 100644 index 00000000000..265fdbd58f1 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift @@ -0,0 +1,98 @@ + +import Foundation + +public extension PCollection where Of == Never { + /// Impulse the most basic transform. It can only be attached to PCollections of type Never, + /// which is the root transform used by Pipelines. + func impulse() -> PCollection<Data> { + let output = PCollection<Data>() + self.apply(.impulse(AnyPCollection(output))) + return output + } +} + +/// ParDo is the core user operator that pretty much everything else gets built on. We provide two versions here +public extension PCollection { + // TODO: Replace with parameter pack version once https://github.com/apple/swift/issues/67192 is resolved + + // No Output + func pardo<F:SerializableFn>(_ name: String = "\(#file):\(#line)",_ fn: F) { + self.apply(.pardo(name, fn, [])) + } + func pardo(_ name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { + self.apply(.pardo(name, ClosureFn(fn),[])) + } + func pardo<Param:Codable>(_ name: String = "\(#file):\(#line)",_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { + self.apply(.pardo(name, ParameterizedClosureFn(param,fn), [])) + } + + + // Single Output + func pardo<F:SerializableFn,O0>(_ name: String = "\(#file):\(#line)",_ fn: F, + _ o0:PCollection<O0>) { + self.apply(.pardo(name, fn, [AnyPCollection(o0)])) + } + func pardo<O0>(_ name: String = "\(#file):\(#line)", + _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { + let output = PCollection<O0>() + self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output)])) + return output + } + func pardo<Param:Codable,O0>(_ name: String = "\(#file):\(#line)",_ param: Param, + _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { + let output = PCollection<O0>() + self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) + return output + } + + // Two Outputs + func pardo<F:SerializableFn,O0,O1>(_ name: String = "\(#file):\(#line)",_ fn: F, + _ o0:PCollection<O0>,_ o1:PCollection<O1>) { + self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1)])) + } + func pardo<O0,O1>(_ name: String = "\(#file):\(#line)", + _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { + let output = (PCollection<O0>(),PCollection<O1>()) + self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + return output + } + func pardo<Param:Codable,O0,O1>(_ name: String = "\(#file):\(#line)",_ param: Param, + _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { + let output = (PCollection<O0>(),PCollection<O1>()) + self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + return output + } + + // Three Outputs + func pardo<F:SerializableFn,O0,O1,O2>(_ name: String = "\(#file):\(#line)",_ fn: F, + _ o0:PCollection<O0>,_ o1:PCollection<O1>,_ o2:PCollection<O2>) { + self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) + } + func pardo<O0,O1,O2>(_ name: String = "\(#file):\(#line)", + _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { + let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) + self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + return output + } + func pardo<Param:Codable,O0,O1,O2>(_ name: String = "\(#file):\(#line)",_ param: Param, + _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { + let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) + self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + return output + } + + //TODO: Add more as needed +} + +public extension PCollection { + /// Core GroupByKey transform. Requires a pair input + func groupByKey<K,V>() -> PCollection<KV<K,V>> where Of == KV<K,V> { + // Adjust the coder for the pcollection to reflect GBK semantcs + let output = PCollection<KV<K,V>>(coder:.keyvalue(.of(type: K.self)!, .of(type: Array<V>.self)!)) + self.apply(.groupByKey(AnyPCollection(output))) + return output + } + + + +} diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift new file mode 100644 index 00000000000..1b60f2a49cb --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift @@ -0,0 +1,22 @@ +/// Basic reducers +public extension PCollection { + func reduce<Result:Codable,K,V>(name:String = "\(#file):\(#line)",into:Result,_ accumulator: @Sendable @escaping (V,inout Result) -> Void) -> PCollection<KV<K,Result>> where Of == KV<K,V> { + return pardo(name,into) { initialValue,input,output in + for await (kv,ts,w) in input { + var result = initialValue + for v in kv.values { + accumulator(v,&result) + } + output.emit(KV(kv.key,result),timestamp:ts,window:w) + } + + } + } +} + +/// Convenience functions +public extension PCollection { + func sum<K,V:Numeric&Codable>() -> PCollection<KV<K,V>> where Of == KV<K,V> { + return reduce(into: 0,{ a,b in b = b + a }) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift new file mode 100644 index 00000000000..e2025394471 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift @@ -0,0 +1,8 @@ +/// Basic grouping functionality +/// +public extension PCollection { + func groupBy<K,V>(name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { + return map(name:name,fn) + .groupByKey() + } +} diff --git a/sdks/swift/Sources/ApacheBeamDocumentation/Documentation.docc/ApacheBeam.md b/sdks/swift/Sources/ApacheBeamDocumentation/Documentation.docc/ApacheBeam.md new file mode 100644 index 00000000000..afbac1a07f1 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeamDocumentation/Documentation.docc/ApacheBeam.md @@ -0,0 +1,37 @@ +# ``ApacheBeam`` + +A Swift SDK implementation for Beam + +@Metadata { + @DisplayName("Apache Beam SDK for Swift") +} + +## Overview + +The Apache Beam SDK for Swift allows Swift developers to create executables that can be submitted to all Beam Portable Runners including Flink and Dataflow. + +To use the Apache Beam SDK for Swift, first add it as a dependency to an executable project: + +```swift +let package = Package( + // name, platforms, products, etc. + dependencies: [ + // other dependencies + .package(url: "https://github.com/apache/beam/sdks/swift", from: "2.51.0"), + ], + targets: [ + // targets + ] +) +``` + +> Note: Swift 5.9 or higher is required in order to use the Swift SDK + +## Topics + +### Getting Started + + + + + diff --git a/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift b/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift index 74031c8b545..acd75219f50 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift @@ -22,6 +22,15 @@ import XCTest @testable import ApacheBeam final class CoderTests: XCTestCase { + + func testSimpleScalarConversions() throws { + XCTAssertTrue(Coder.of(type: Data.self) == .bytes) + XCTAssertTrue(Coder.of(type: String.self) == .string) + XCTAssertTrue(Coder.of(type: Bool.self) == .boolean) + XCTAssertTrue(Coder.of(type: Int.self) == .varint) + } + + func testDefaultImpulseDecode() throws { var impulse = Data([0x7f,0xdf,0x3b,0x64,0x5a,0x1c,0xac,0x09,0x00,0x00,0x00,0x01,0x0f,0x00]) let impulseCoder = Coder.windowedvalue(.bytes, .globalwindow) diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift new file mode 100644 index 00000000000..3e2d6a3be7d --- /dev/null +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift @@ -0,0 +1,76 @@ +// +// IntegrationTests.swift +// +// +// Created by Byron Ellis on 8/11/23. +// + +import XCTest +import ApacheBeam + +func fixtureData(_ fixture: String) throws -> Data { + try Data(contentsOf: fixtureUrl(fixture)) +} + + +func fixtureUrl(_ fixture: String) -> URL { + fixturesDirectory().appendingPathComponent(fixture) +} + + +func fixturesDirectory(path: String = #file) -> URL { + let url = URL(fileURLWithPath: path) + let testsDir = url.deletingLastPathComponent() + let res = testsDir.appendingPathComponent("Fixtures") + return res +} + +final class IntegrationTests: XCTestCase { + + override func setUpWithError() throws { + } + + override func tearDownWithError() throws { + } + + func testPortableWordcount() throws { + _ = Pipeline { pipeline in + let (contents,errors) = pipeline + .create(["file1.txt","file2.txt","missing.txt"]) + .pardo { filenames,output,errors in + for await (filename,_,_) in filenames { + do { + output.emit(String(decoding:try fixtureData(filename),as:UTF8.self)) + } catch { + errors.emit("Unable to read \(filename): \(error)") + } + } + } + + // Simple ParDo that takes advantage of enumerateLines + let lines = contents.pardo { contents,lines in + for await (content,_,_) in contents { + content.enumerateLines { line,_ in + lines.emit(line) + } + } + } + + // Our first group by operation + let baseCount = lines + .flatMap({ $0.components(separatedBy: .whitespaces) }) + .groupBy({ ($0,1) }) + .sum() + + let normalizedCounts = baseCount.groupBy { + ($0.key.lowercased().trimmingCharacters(in: .punctuationCharacters), + $0.value ?? 1) + }.sum() + + + + } + } + + +}