This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5be3750fd8f5294f98867c44c8339dcd192837d8
Author: Byron Ellis <byronel...@google.com>
AuthorDate: Mon Aug 14 07:49:04 2023 -0700

    Refactor Unary and Tuple SerializableFn into a single ClosureFn since one 
is just a special case of the other. Add the wordcount integration test to 
verify the refactoring does indeed work and start moving the pipeline code into 
the branch.
---
 sdks/python/output.json-00000-of-00001             |  2 +
 sdks/swift/Documentation/INTERNALS.md              |  4 -
 .../Sources/ApacheBeam/Coders/BeamValue.swift      |  2 +
 .../swift/Sources/ApacheBeam/Coders/Beamable.swift | 24 ++++++
 .../Sources/ApacheBeam/Coders/Coder+Decoding.swift |  1 +
 sdks/swift/Sources/ApacheBeam/Coders/Coder.swift   | 31 ++++++-
 .../ApacheBeam/Core/DynamicProperties.swift        | 36 ++++++++
 .../Sources/ApacheBeam/Core/Fn/ClosureFn.swift     | 63 ++++++++++++++
 sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift   | 15 ++++
 sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift | 63 --------------
 .../Core/Fn/ParameterizedClosureFn.swift           | 74 ++++++++++++++++
 .../{SerialiableFn.swift => SerializableFn.swift}  |  4 +-
 .../Core/PCollection/AnyPCollection.swift          | 51 +++++++++++
 .../Core/PCollection/AnyPCollectionStream.swift    | 17 ++--
 .../ApacheBeam/Core/PCollection/PCollection.swift  | 32 +++++++
 .../Core/PCollection/PCollectionStream.swift       | 73 ++++++++++++++--
 .../ApacheBeam/Core/PTransform/AnyPTransform.swift | 25 ++++++
 .../ApacheBeam/Core/PTransform/PTransform.swift    | 33 ++++++++
 .../ApacheBeam/Core/Pipeline/Pipeline.swift        | 14 ++++
 .../Core/Pipeline/PipelineTransform.swift          | 16 ++++
 .../ApacheBeam/Runtime/Worker/WorkerProvider.swift |  2 +-
 .../Sources/ApacheBeam/Transforms/Basic.swift      | 68 +++++++++++++++
 .../Sources/ApacheBeam/Transforms/BuiltIn.swift    | 98 ++++++++++++++++++++++
 .../Sources/ApacheBeam/Transforms/Combining.swift  | 22 +++++
 .../Sources/ApacheBeam/Transforms/Grouping.swift   |  8 ++
 .../Documentation.docc/ApacheBeam.md               | 37 ++++++++
 .../Tests/ApacheBeamTests/Coders/CoderTests.swift  |  9 ++
 .../Pipeline/IntegrationTests.swift                | 76 +++++++++++++++++
 28 files changed, 815 insertions(+), 85 deletions(-)

diff --git a/sdks/python/output.json-00000-of-00001 
b/sdks/python/output.json-00000-of-00001
new file mode 100644
index 00000000000..8b56471c52f
--- /dev/null
+++ b/sdks/python/output.json-00000-of-00001
@@ -0,0 +1,2 @@
+{"col1":"bar","col2":2,"col3":200}
+{"col1":"baz","col2":3,"col3":300}
diff --git a/sdks/swift/Documentation/INTERNALS.md 
b/sdks/swift/Documentation/INTERNALS.md
deleted file mode 100644
index 757e1b44602..00000000000
--- a/sdks/swift/Documentation/INTERNALS.md
+++ /dev/null
@@ -1,4 +0,0 @@
-#  Internals
-
-
-
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift 
b/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift
index 054356ed6f1..b7ed89c4668 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift
@@ -37,6 +37,8 @@ public indirect enum BeamValue {
     /// A window
     case window(Window)
     
+    //TODO: Custom Values and Row Values
+    
     // Composite Values
     
     /// An iterable
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Beamable.swift 
b/sdks/swift/Sources/ApacheBeam/Coders/Beamable.swift
new file mode 100644
index 00000000000..239c248104c
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Beamable.swift
@@ -0,0 +1,24 @@
+
+import Foundation
+
+/// Codable is already taken and besides Beamable is too good to pass up
+public protocol Beamable {
+    static var coder: Coder { get }
+}
+
+extension Data : Beamable {
+    public static let coder:Coder = .bytes
+}
+
+extension String : Beamable {
+    public static let coder:Coder = .string
+}
+
+extension Int : Beamable {
+    public static let coder:Coder = .varint
+}
+
+extension Bool : Beamable {
+    public static let coder:Coder = .boolean
+}
+
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift 
b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
index 48c1cb9db43..31b97f0d318 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
@@ -17,6 +17,7 @@
  */
 import Foundation
 
+/// This extension contains all of the decoding implementation. File 
separation is for clarity.
 public extension Coder {
     
     /// Decodes a raw data block into a BeamValue for further processing
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift 
b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
index e8c23b9d4cf..ceb2649243e 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
@@ -18,20 +18,21 @@
 import Foundation
 
 public indirect enum Coder {
-    // Special
+    /// Catch-all for coders we don't understand. Mostly used for error 
reporting
     case unknown(String)
+    // TODO: Actually implement this
     case custom(Data)
     
-    // Scalar standard coders
+    /// Standard scalar coders. Does not necessarily correspond 1:1 with 
BeamValue. For example, varint and fixedint both map to integer
     case double,varint,fixedint,byte,bytes,string,boolean,globalwindow
     
-    // Composite standard coders
-    
+    /// Composite coders.
     case keyvalue(Coder,Coder)
     case iterable(Coder)
     case lengthprefix(Coder)
     case windowedvalue(Coder,Coder)
     
+
     // TODO: Row Coder
 }
 
@@ -71,6 +72,7 @@ public extension Coder {
         }
     }
     
+    /// Static list of coders for use in capabilities arrays in environments.
     static let capabilities:[String] = 
["byte","bytes","bool","varint","double","integer","string_utf8","length_prefix","kv","iterable","windowed_value","global_window"]
         .map({ .coderUrn($0) })
 }
@@ -187,4 +189,25 @@ extension Coder {
     }
 }
 
+extension Coder {
+    
+    
+    
+    public static func of<Of>(type: Optional<Of>.Type) -> Coder? {
+        return .lengthprefix(.of(type: Of.self)!)
+    }
+    
+    public static func of<Of>(type: Array<Of>.Type) -> Coder? {
+        return .iterable(.of(type: Of.self)!)
+    }
+    
+    public static func of<Of>(type: Of.Type) -> Coder? {
+        // Beamables provider their own default coder implementation
+        if let beamable  = Of.self as? Beamable.Type {
+            return beamable.coder
+        }
+        return nil
+    }
+}
+
 
diff --git a/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift 
b/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift
new file mode 100644
index 00000000000..ee3c61a65c5
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift
@@ -0,0 +1,36 @@
+import Logging
+
+public protocol DynamicProperty { }
+
+@propertyWrapper
+public struct PInput<Of> : DynamicProperty {
+    public var wrappedValue: PCollectionStream<Of>
+    
+    public init(wrappedValue: PCollectionStream<Of> = .init()) {
+        self.wrappedValue = wrappedValue
+    }
+}
+
+@propertyWrapper
+public struct POutput<Of> : DynamicProperty {
+    public var wrappedValue: PCollectionStream<Of>
+    public init(wrappedValue: PCollectionStream<Of> = .init()) {
+        self.wrappedValue = wrappedValue
+    }
+}
+
+@propertyWrapper
+public struct Logger : DynamicProperty {
+    public var wrappedValue: Logging.Logger
+    public init(wrappedValue: Logging.Logger = Logging.Logger(label: "TEST")) {
+        self.wrappedValue = wrappedValue
+    }
+}
+
+@propertyWrapper
+public struct Serialized<Value:Codable> : DynamicProperty {
+    public var wrappedValue: Value?
+    public init(wrappedValue: Value? = nil) {
+        self.wrappedValue = wrappedValue
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift
new file mode 100644
index 00000000000..fcf43a91215
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import Foundation
+
+/// A SerializableFn that holds a reference to a function that takes a single 
input and produces a variable number of outputs
+public final class ClosureFn : SerializableFn {
+    let processClosure: 
(SerializableFnBundleContext,[AnyPCollectionStream],[AnyPCollectionStream]) 
async throws -> Void
+
+    //TODO: Replace this with a parameter pack version once I figure out how 
to do that
+
+    public init<Of>(_ fn: @Sendable @escaping (PCollection<Of>.Stream) async 
throws -> Void) {
+        self.processClosure = { context,inputs,outputs in
+            try await fn(inputs[0].stream())
+        }
+    }
+    
+    public init<Of,O0>(_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) {
+        self.processClosure = { context,inputs,outputs in
+            try await fn(inputs[0].stream(),outputs[0].stream())
+        }
+    }
+
+    public init<Of,O0,O1>(_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async 
throws -> Void) {
+        self.processClosure = { context,inputs,outputs in
+            try await 
fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream())
+        }
+    }
+    
+    public init<Of,O0,O1,O2>(_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) {
+        self.processClosure = { context,inputs,outputs in
+            try await 
fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream())
+        }
+    }
+
+    public init<Of,O0,O1,O2,O3>(_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream,PCollection<O3>.Stream)
 async throws -> Void) {
+        self.processClosure = { context,inputs,outputs in
+            try await 
fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream(),outputs[3].stream())
+        }
+    }
+    
+    public func process(context: SerializableFnBundleContext, inputs: 
[AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> 
(String, String) {
+        try await processClosure(context,inputs,outputs)
+        outputs.finish()
+        return (context.instruction,context.transform)
+    }
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift
new file mode 100644
index 00000000000..58d1c211c81
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift
@@ -0,0 +1,15 @@
+
+import Logging
+
+/// A higher level interface to SerializableFn using dependency injected 
dynamic properties in the same
+/// way as we define Composite PTransforms
+public protocol DoFn {
+    func process() async throws
+    func finishBundle() async throws
+}
+
+public extension DoFn {
+    func finishBundle() async throws { }
+}
+
+
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift
deleted file mode 100644
index 82fd4708046..00000000000
--- a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-import Foundation
-
-public final class DoneFn<Of> : SerializableFn {
-
-    private let fn: (PCollectionStream<Of>) async throws -> Void
-    public init(_ fn: @Sendable @escaping (PCollectionStream<Of>) async throws 
-> Void) {
-        self.fn = fn
-    }
-    
-    
-    public func process(context: SerializableFnBundleContext, inputs: 
[AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> 
(String, String) {
-        try await fn(inputs[0].stream())
-        for output in outputs {
-            output.finish()
-        }
-        return (context.instruction,context.transform)
-    }
-}
-
-public final class ParameterizedDoneFn<Of,Param:Codable> : SerializableFn {
-    
-    
-    private let param: Param
-    private let fn: (Param,PCollectionStream<Of>) async throws -> Void
-    
-    public init(_ param: Param,_ fn: @Sendable @escaping 
(Param,PCollectionStream<Of>) async throws -> Void){
-        self.param = param
-        self.fn = fn
-    }
-    
-    public var payload: Data {
-        get throws {
-            try JSONEncoder().encode(param)
-        }
-    }
-    
-    public func process(context: SerializableFnBundleContext, inputs: 
[AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> 
(String, String) {
-        try await fn(try JSONDecoder().decode(Param.self, from: 
context.payload),inputs[0].stream())
-        for output in outputs {
-            output.finish()
-        }
-        return (context.instruction,context.transform)
-    }
-
-}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/ParameterizedClosureFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/ParameterizedClosureFn.swift
new file mode 100644
index 00000000000..575886d9f8c
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/ParameterizedClosureFn.swift
@@ -0,0 +1,74 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import Foundation
+
+/// A SerializableFn that holds a reference to a function that takes a single 
input and produces a variable number of outputs
+public final class ParameterizedClosureFn<Param:Codable> : SerializableFn {
+    let param: Param
+    let processClosure: 
(SerializableFnBundleContext,[AnyPCollectionStream],[AnyPCollectionStream]) 
async throws -> Void
+
+    //TODO: Replace this with a parameter pack version once I figure out how 
to do that
+
+    public init<Of>(_ param:Param,_ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream) async throws -> Void) {
+        self.param = param
+        self.processClosure = { context,inputs,outputs in
+            try await fn(try JSONDecoder().decode(Param.self, from: 
context.payload),inputs[0].stream())
+        }
+    }
+    
+    public init<Of,O0>(_ param:Param,_ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) {
+        self.param = param
+        self.processClosure = { context,inputs,outputs in
+            try await fn(try JSONDecoder().decode(Param.self, from: 
context.payload),inputs[0].stream(),outputs[0].stream())
+        }
+    }
+
+    public init<Of,O0,O1>(_ param:Param,_ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) 
async throws -> Void) {
+        self.param = param
+        self.processClosure = { context,inputs,outputs in
+            try await fn(try JSONDecoder().decode(Param.self, from: 
context.payload),inputs[0].stream(),outputs[0].stream(),outputs[1].stream())
+        }
+    }
+    
+    public init<Of,O0,O1,O2>(_ param:Param,_ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) {
+        self.param = param
+        self.processClosure = { context,inputs,outputs in
+            try await fn(try JSONDecoder().decode(Param.self, from: 
context.payload),inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream())
+        }
+    }
+
+    public init<Of,O0,O1,O2,O3>(_ param:Param,_ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream,PCollection<O3>.Stream)
 async throws -> Void) {
+        self.param = param
+        self.processClosure = { context,inputs,outputs in
+            try await fn(try JSONDecoder().decode(Param.self, from: 
context.payload),inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream(),outputs[3].stream())
+        }
+    }
+    
+    public func process(context: SerializableFnBundleContext, inputs: 
[AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> 
(String, String) {
+        try await processClosure(context,inputs,outputs)
+        outputs.finish()
+        return (context.instruction,context.transform)
+    }
+    
+    public var payload: Data {
+        get throws {
+            try JSONEncoder().encode(param)
+        }
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
similarity index 86%
rename from sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift
rename to sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
index 3a30066dc4b..babf64dfe31 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
@@ -25,7 +25,8 @@ public struct SerializableFnBundleContext {
     let log:Logger
 }
 
-/// SerialiableFn is a protocol for functions that should be parameterized for 
the pipeline
+/// SerialiableFn is a protocol for functions that should be parameterized for 
the pipeline. This is intended as a fairly low level class and users
+/// should interact with the apply() functions defined in the transform 
section or implement the DoFn protocol which is then wrapped
 public protocol SerializableFn {
     var  payload: Data { get throws }
     func 
process(context:SerializableFnBundleContext,inputs:[AnyPCollectionStream],outputs:[AnyPCollectionStream])
 async throws -> (String,String)
@@ -35,4 +36,3 @@ public protocol SerializableFn {
 public extension SerializableFn {
     var payload : Data { Data() }
 }
-
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
new file mode 100644
index 00000000000..8f23dccb954
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
@@ -0,0 +1,51 @@
+public struct AnyPCollection : PCollectionProtocol {
+    
+    
+    
+    let type: Any.Type
+    let ofType: Any.Type
+    let collection: Any
+    
+    let applyClosure: (Any,PipelineTransform) -> Void
+    let consumersClosure: (Any) -> [PipelineTransform]
+    let coderClosure: (Any) -> Coder
+    let streamClosure: (Any) -> AnyPCollectionStream
+    
+    public init<C>(_ collection: C) where C : PCollectionProtocol {
+        if let anyCollection = collection as? AnyPCollection {
+            self = anyCollection
+        } else {
+            self.type = C.self
+            self.ofType = C.Of.self
+            self.collection = collection
+            
+            self.applyClosure = { ($0 as! C).apply($1) }
+            self.consumersClosure = { ($0 as! C).consumers }
+            self.coderClosure = { ($0 as! C).coder }
+            self.streamClosure = { AnyPCollectionStream(($0 as! C).stream) }
+        }
+    }
+    
+    
+    public var consumers: [PipelineTransform] {
+        consumersClosure(collection)
+    }
+    
+    public func apply(_ transform: PipelineTransform) {
+        applyClosure(collection,transform)
+    }
+
+    
+    public var coder: Coder {
+        coderClosure(collection)
+    }
+
+    public var stream: PCollectionStream<Never> {
+        fatalError("Do not use `stream` on AnyPCollection. Use `anyStream` 
instead.")
+    }
+    
+    public var anyStream: AnyPCollectionStream {
+        streamClosure(collection)
+    }
+    
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
index da15c740771..df11ecd3780 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
@@ -33,7 +33,7 @@ public struct AnyPCollectionStream : AsyncSequence {
     
     let value: Any
     let nextGenerator: (Any) -> (() async throws -> Iterator.Element?)
-    let emitClosure: (Any,Any) -> Void
+    let emitClosure: (Any,Any) throws -> Void
     let finishClosure: (Any) -> Void
     
     public func makeAsyncIterator() -> Iterator {
@@ -50,12 +50,12 @@ public struct AnyPCollectionStream : AsyncSequence {
         self.emitClosure = {
             let stream = ($0 as! PCollectionStream<Of>)
             if let beamValue = $1 as? BeamValue {
-                stream.emit(beamValue)
+                try stream.emit(beamValue)
             } else if let element = $1 as? Element {
                 stream.emit((element.0 as! Of,element.1,element.2))
             } else if let element = $1 as? PCollectionStream<Of>.Element {
                 stream.emit(element)
-            }
+            } 
         }
         
         self.finishClosure = {
@@ -81,6 +81,13 @@ public struct AnyPCollectionStream : AsyncSequence {
     public func finish() {
         finishClosure(self.value)
     }
-    
-    
+}
+
+/// Convenience function of an array of AnyPCollectionStream elements to 
finish processing.
+public extension Array where Array.Element == AnyPCollectionStream {
+    func finish() {
+        for stream in self {
+            stream.finish()
+        }
+    }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
new file mode 100644
index 00000000000..5fb52f7e7ea
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
@@ -0,0 +1,32 @@
+public protocol PCollectionProtocol {
+    associatedtype Of
+    
+    typealias Stream = PCollectionStream<Of>
+    
+    var consumers: [PipelineTransform] { get }
+    var coder: Coder { get }
+    var stream: Stream { get }
+    
+    func apply(_ transform: PipelineTransform)
+}
+
+
+public final class PCollection<Of> : PCollectionProtocol {
+
+    public let coder: Coder
+    public var consumers: [PipelineTransform]
+    
+    public init(coder: Coder = .of(type: 
Of.self)!,consumers:[PipelineTransform] = []) {
+        self.coder = coder
+        self.consumers = consumers
+    }
+
+    public var stream: PCollectionStream<Of> {
+        return PCollectionStream<Of>()
+    }
+    
+    public func apply(_ transform: PipelineTransform) {
+        consumers.append(transform)
+    }
+    
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
index d54e2638230..1d2b3cbe151 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
@@ -26,10 +26,7 @@ public final class PCollectionStream<Of> : AsyncSequence {
     private let emitter: AsyncStream<Element>.Continuation
     
     public init() {
-        //Construct a stream, capturing the emit continuation
-        var tmp: AsyncStream<Element>.Continuation?
-        self.stream = AsyncStream<Element> { tmp = $0 }
-        self.emitter = tmp!
+        (self.stream,self.emitter) = AsyncStream.makeStream(of:Element.self)
     }
     
     public func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
@@ -49,8 +46,72 @@ public final class PCollectionStream<Of> : AsyncSequence {
         emit((value,timestamp,window))
     }
     
-    public func emit(_ value: BeamValue) {
-        
+    // Implementing key-value pair conversion is a little more complicated 
because we need
+    // to convert to a KV<K,V> from what is essentially a KV<Any,Any> which 
requires us to
+    // cast the key and the value first and then construct the KV from that. 
There might be
+    // a more clever way of doing this, but I don't know what it is.
+    
+    func emit<K,V>(key: K,value: [V],timestamp: Date,window:Window) {
+        emit(KV(key,value) as! Of,timestamp:timestamp,window:window)
+    }
+    
+    func emit<K>(key: K,value: BeamValue,timestamp: Date,window:Window) throws 
{
+        //We overload the key value type as both (K,[V]) and (K,V). It may be 
worth considering
+        //having an explicit Pair type in addition to KV to simplify this 
decoding a little bit.
+        //
+        // On the other hand, the code is already written and pretty 
straightforward and there
+        // won't be much in the way of new scalar values.
+        if case .array(let array) = value {
+            switch array.first {
+            case .boolean(_):emit(key:key,value:array.map({$0.baseValue as! 
Bool}),timestamp:timestamp,window:window)
+            case .bytes(_):  emit(key:key,value:array.map({$0.baseValue as! 
Data}),timestamp:timestamp,window:window)
+            case .double(_): emit(key:key,value:array.map({$0.baseValue as! 
Double}),timestamp:timestamp,window:window)
+            case .integer(_):emit(key:key,value:array.map({$0.baseValue as! 
Int}),timestamp:timestamp,window:window)
+            case .string(_): emit(key:key,value:array.map({$0.baseValue as! 
String}),timestamp:timestamp,window:window)
+            default:
+                throw ApacheBeamError.runtimeError("Can't use 
\(String(describing:array.first)) as a value in a key value pair")
+            }
+        } else {
+            switch value {
+            case let 
.boolean(v):emit(key:key,value:[v],timestamp:timestamp,window:window)
+            case let .bytes(v):  
emit(key:key,value:[v],timestamp:timestamp,window:window)
+            case let .double(v): 
emit(key:key,value:[v],timestamp:timestamp,window:window)
+            case let 
.integer(v):emit(key:key,value:[v],timestamp:timestamp,window:window)
+            case let .string(v): 
emit(key:key,value:[v],timestamp:timestamp,window:window)
+            default:
+                throw ApacheBeamError.runtimeError("Can't use \(value) as a 
value in a key value pair")
+            }
+
+        }
+    }
+    
+    // Unwrap all of the actual value types (not windows or windowed elements)
+    func emit(_ value: BeamValue,timestamp: Date,window:Window) throws {
+        if case let .kv(key,value) = value {
+            // Unwrap the key first
+            switch key {
+            case let .boolean(v):try 
emit(key:v,value:value,timestamp:timestamp,window:window)
+            case let .bytes(v):  try 
emit(key:v,value:value,timestamp:timestamp,window:window)
+            case let .double(v): try 
emit(key:v,value:value,timestamp:timestamp,window:window)
+            case let .integer(v):try 
emit(key:v,value:value,timestamp:timestamp,window:window)
+            case let .string(v): try 
emit(key:v,value:value,timestamp:timestamp,window:window)
+            default:
+                throw ApacheBeamError.runtimeError("Can't use \(value) as a 
value in a key value pair")
+            }
+        } else {
+            emit(value.baseValue as! Of,timestamp: timestamp,window: window)
+        }
+    }
+    
+    /// Mostly intended as a convenience function for bundle processing this 
emit unwraps a windowed value
+    /// for further conversion in (non-public) versions of the function.
+    public func emit(_ value: BeamValue) throws {
+        switch value {
+        case let .windowed(value, timestamp, _, window):
+            try emit(value,timestamp:timestamp,window:window.baseValue as! 
Window)
+        default:
+            throw ApacheBeamError.runtimeError("Only windowed values can be 
sent directly to a PCollectionStream, not \(value)")
+        }
     }
     
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/AnyPTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/AnyPTransform.swift
new file mode 100644
index 00000000000..0be179c8cdf
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/AnyPTransform.swift
@@ -0,0 +1,25 @@
+public struct AnyPTransform : _PrimitivePTransform {
+    let type: Any.Type
+    var transform: Any
+    
+    let expandClosure: (Any) -> AnyPTransform
+    let expansionType: Any.Type
+    
+    
+    public init<T>(_ transform: T) where T: PTransform {
+        if let anyTransform = transform as? AnyPTransform {
+            self = anyTransform
+        } else {
+            self.type = T.self
+            self.expansionType = T.Expansion.self
+            self.transform = transform
+            self.expandClosure = { AnyPTransform(($0 as! T).expand) }
+        }
+    }
+}
+
+extension AnyPTransform : ParentPTransform {
+    public var children: [AnyPTransform] {
+        (transform as? ParentPTransform)?.children ?? []
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift
new file mode 100644
index 00000000000..483a784c420
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift
@@ -0,0 +1,33 @@
+/// Represents a composite transform
+public protocol PTransform {
+    associatedtype Expansion: PTransform
+    
+    var expand: Expansion { get }
+}
+
+public extension Never {
+    var expand: Never {
+        fatalError()
+    }
+}
+
+extension Never : PTransform { }
+
+/// Represents PTransforms that can't be expanded further. When constructing 
the pipeline the expansion
+/// happens until we hit this point
+public protocol _PrimitivePTransform : PTransform where Expansion == Never { }
+public extension _PrimitivePTransform {
+    var expand: Never {
+        neverExpand(String(reflecting: Self.self)) }
+}
+
+
+public protocol ParentPTransform {
+    var children: [AnyPTransform] { get }
+}
+
+protocol GroupPTransform : ParentPTransform { }
+
+public func neverExpand(_ type: String) -> Never {
+    fatalError("\(type) is a primitive PTransform and cannot be expanded.")
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
new file mode 100644
index 00000000000..f918a620ea1
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
@@ -0,0 +1,14 @@
+import GRPC
+import Logging
+
+public final class Pipeline {
+    let content: (inout PCollection<Never>) -> Void
+    let log: Logging.Logger
+    
+    public init(log: Logging.Logger = .init(label:"Pipeline"),_ content: 
@escaping (inout PCollection<Never>) -> Void) {
+        self.log = log
+        self.content = content
+    }
+    
+    
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
new file mode 100644
index 00000000000..dea7daab738
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
@@ -0,0 +1,16 @@
+
+import Foundation
+
+/// Enum for pipeline representable transforms as opposed to composite 
transforms
+/// which are a user-side construct represented by PTransform
+public enum PipelineTransform {
+    case pardo(String,SerializableFn,[AnyPCollection])
+    case impulse(AnyPCollection)
+    case flatten([AnyPCollection],AnyPCollection)
+    case groupByKey(AnyPCollection)
+    case custom(String,Data,[AnyPCollection])
+    case composite(AnyPTransform)
+}
+
+
+
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
index fb5e2e52225..e45c3f7eacb 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
@@ -20,7 +20,7 @@ import Logging
 
 actor WorkerProvider : 
Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorkerPoolAsyncProvider {
 
-    private let log = Logger(label:"WorkerProvider")
+    private let log = Logging.Logger(label: "Worker")
     private var workers: [String:Worker] = [:]
 
     private let functions: [String:SerializableFn]
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
new file mode 100644
index 00000000000..52f2b28aef1
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
@@ -0,0 +1,68 @@
+
+/// Creating Static Values
+public extension PCollection {
+
+    /// Each time the input fires output all of the values in this list.
+    func create<Value:Codable>(_ values: [Value],_ name:String = 
"\(#file):\(#line)") -> PCollection<Value> {
+        return pardo(name,values) { values,input,output in
+            for try await (_,ts,w) in input {
+                for v in values {
+                    output.emit(v,timestamp:ts,window:w)
+                }
+            }
+        }
+    }
+}
+
+/// Convenience logging mappers
+public extension PCollection {
+    func log(prefix:String,name:String = "\(#file):\(#line)") -> 
PCollection<Of> where Of == String {
+        pardo(name,prefix) { prefix,input,output in
+            for await element in input {
+                print("\(prefix): \(element)")
+                output.emit(element)
+            }
+        }
+    }
+}
+
+/// Modifying Values
+public extension PCollection {
+    
+    /// Modify a value without changing its window or timestamp
+    func map<Out>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping 
(Of) -> Out) -> PCollection<Out> {
+        return pardo(name) { input,output in
+            for try await (v,ts,w) in input {
+                output.emit(fn(v),timestamp:ts,window:w)
+            }
+        }
+    }
+    
+    func map<K,V>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping 
(Of) -> (K,V)) -> PCollection<KV<K,V>> {
+        return pardo(name) { input,output in
+            for try await (i,ts,w) in input {
+                let (key,value) = fn(i)
+                output.emit(KV(key,value),timestamp:ts,window:w)
+            }
+        }
+    }
+
+    /// Produce multiple outputs as a single value without modifying window or 
timestamp
+    func flatMap<Out>(name:String = "\(#file):\(#line)",_ fn: @Sendable 
@escaping (Of) -> [Out]) -> PCollection<Out> {
+        return pardo(name) { input,output in
+            for try await (v,ts,w) in input {
+                for i in fn(v) {
+                    output.emit(i,timestamp:ts,window:w)
+                }
+            }
+        }
+    }
+
+}
+
+public extension PCollection<Never> {
+    /// Convenience function to add an impulse when we are at the root of the 
pipeline
+    func create<Value:Codable>(_ values: [Value],_ name:String = 
"\(#file):\(#line)") -> PCollection<Value> {
+        return impulse().create(values,name)
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
new file mode 100644
index 00000000000..265fdbd58f1
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
@@ -0,0 +1,98 @@
+
+import Foundation
+
+public extension PCollection where Of == Never {
+    /// Impulse the most basic transform. It can only be attached to 
PCollections of type Never,
+    /// which is the root transform used by Pipelines.
+    func impulse() -> PCollection<Data> {
+        let output = PCollection<Data>()
+        self.apply(.impulse(AnyPCollection(output)))
+        return output
+    }
+}
+
+/// ParDo is the core user operator that pretty much everything else gets 
built on. We provide two versions here
+public extension PCollection {
+    // TODO: Replace with parameter pack version once 
https://github.com/apple/swift/issues/67192 is resolved
+    
+    // No Output
+    func pardo<F:SerializableFn>(_ name: String = "\(#file):\(#line)",_ fn: F) 
{
+        self.apply(.pardo(name, fn, []))
+    }
+    func pardo(_ name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping 
(PCollection<Of>.Stream) async throws -> Void) {
+        self.apply(.pardo(name, ClosureFn(fn),[]))
+    }
+    func pardo<Param:Codable>(_ name: String = "\(#file):\(#line)",_ 
param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async 
throws -> Void) {
+        self.apply(.pardo(name, ParameterizedClosureFn(param,fn), []))
+    }
+
+
+    // Single Output
+    func pardo<F:SerializableFn,O0>(_ name: String = "\(#file):\(#line)",_ fn: 
F,
+                                    _ o0:PCollection<O0>) {
+        self.apply(.pardo(name, fn, [AnyPCollection(o0)]))
+    }
+    func pardo<O0>(_ name: String = "\(#file):\(#line)",
+                   _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
+        let output = PCollection<O0>()
+        self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output)]))
+        return output
+    }
+    func pardo<Param:Codable,O0>(_ name: String = "\(#file):\(#line)",_ param: 
Param,
+                   _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
+        let output = PCollection<O0>()
+        
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
+        return output
+    }
+
+    // Two Outputs
+    func pardo<F:SerializableFn,O0,O1>(_ name: String = "\(#file):\(#line)",_ 
fn: F,
+                                    _ o0:PCollection<O0>,_ o1:PCollection<O1>) 
{
+        self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1)]))
+    }
+    func pardo<O0,O1>(_ name: String = "\(#file):\(#line)",
+                      _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async 
throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
+        let output = (PCollection<O0>(),PCollection<O1>())
+        
self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        return output
+    }
+    func pardo<Param:Codable,O0,O1>(_ name: String = "\(#file):\(#line)",_ 
param: Param,
+                                    _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) 
async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
+        let output = (PCollection<O0>(),PCollection<O1>())
+        
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        return output
+    }
+
+    // Three Outputs
+    func pardo<F:SerializableFn,O0,O1,O2>(_ name: String = 
"\(#file):\(#line)",_ fn: F,
+                                          _ o0:PCollection<O0>,_ 
o1:PCollection<O1>,_ o2:PCollection<O2>) {
+        self.apply(.pardo(name, fn, 
[AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
+    }
+    func pardo<O0,O1,O2>(_ name: String = "\(#file):\(#line)",
+                         _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
+        let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
+        
self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        return output
+    }
+    func pardo<Param:Codable,O0,O1,O2>(_ name: String = "\(#file):\(#line)",_ 
param: Param,
+                                       _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
+        let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
+        
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        return output
+    }
+
+    //TODO: Add more as needed
+}
+
+public extension PCollection {
+    /// Core GroupByKey transform. Requires a pair input
+    func groupByKey<K,V>() -> PCollection<KV<K,V>> where Of == KV<K,V> {
+        // Adjust the coder for the pcollection to reflect GBK semantcs
+        let output = PCollection<KV<K,V>>(coder:.keyvalue(.of(type: K.self)!, 
.of(type: Array<V>.self)!))
+        self.apply(.groupByKey(AnyPCollection(output)))
+        return output
+    }
+    
+    
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
new file mode 100644
index 00000000000..1b60f2a49cb
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
@@ -0,0 +1,22 @@
+/// Basic reducers
+public extension PCollection {
+    func reduce<Result:Codable,K,V>(name:String = 
"\(#file):\(#line)",into:Result,_ accumulator: @Sendable @escaping (V,inout 
Result) -> Void) -> PCollection<KV<K,Result>> where Of == KV<K,V> {
+        return pardo(name,into) { initialValue,input,output in
+            for await (kv,ts,w) in input {
+                var result = initialValue
+                for v in kv.values {
+                    accumulator(v,&result)
+                }
+                output.emit(KV(kv.key,result),timestamp:ts,window:w)
+            }
+                
+        }
+    }
+}
+
+/// Convenience functions
+public extension PCollection {
+    func sum<K,V:Numeric&Codable>() -> PCollection<KV<K,V>> where Of == 
KV<K,V> {
+        return reduce(into: 0,{ a,b in b = b + a })
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift
new file mode 100644
index 00000000000..e2025394471
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift
@@ -0,0 +1,8 @@
+/// Basic grouping functionality
+///
+public extension PCollection {
+    func groupBy<K,V>(name: String = "\(#file):\(#line)",_ fn: @Sendable 
@escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
+        return map(name:name,fn)
+            .groupByKey()
+    }
+}
diff --git 
a/sdks/swift/Sources/ApacheBeamDocumentation/Documentation.docc/ApacheBeam.md 
b/sdks/swift/Sources/ApacheBeamDocumentation/Documentation.docc/ApacheBeam.md
new file mode 100644
index 00000000000..afbac1a07f1
--- /dev/null
+++ 
b/sdks/swift/Sources/ApacheBeamDocumentation/Documentation.docc/ApacheBeam.md
@@ -0,0 +1,37 @@
+# ``ApacheBeam``
+
+A Swift SDK implementation for Beam
+
+@Metadata {
+    @DisplayName("Apache Beam SDK for Swift")
+}
+
+## Overview
+
+The Apache Beam SDK for Swift allows Swift developers to create executables 
that can be submitted to all Beam Portable Runners including Flink and 
Dataflow. 
+
+To use the Apache Beam SDK for Swift, first add it as a dependency to an 
executable project:
+
+```swift
+let package = Package(
+    // name, platforms, products, etc.
+    dependencies: [
+        // other dependencies
+        .package(url: "https://github.com/apache/beam/sdks/swift";, from: 
"2.51.0"),
+    ],
+    targets: [
+        // targets
+    ]
+)
+```
+
+> Note: Swift 5.9 or higher is required in order to use the Swift SDK
+
+## Topics
+
+### Getting Started
+
+
+
+
+
diff --git a/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift
index 74031c8b545..acd75219f50 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift
@@ -22,6 +22,15 @@ import XCTest
 @testable import ApacheBeam
 
 final class CoderTests: XCTestCase {
+    
+    func testSimpleScalarConversions() throws {
+        XCTAssertTrue(Coder.of(type: Data.self) == .bytes)
+        XCTAssertTrue(Coder.of(type: String.self) == .string)
+        XCTAssertTrue(Coder.of(type: Bool.self) == .boolean)
+        XCTAssertTrue(Coder.of(type: Int.self) == .varint)
+    }
+    
+    
     func testDefaultImpulseDecode() throws {
         var impulse = 
Data([0x7f,0xdf,0x3b,0x64,0x5a,0x1c,0xac,0x09,0x00,0x00,0x00,0x01,0x0f,0x00])
         let impulseCoder = Coder.windowedvalue(.bytes, .globalwindow)
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
new file mode 100644
index 00000000000..3e2d6a3be7d
--- /dev/null
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
@@ -0,0 +1,76 @@
+//
+//  IntegrationTests.swift
+//  
+//
+//  Created by Byron Ellis on 8/11/23.
+//
+
+import XCTest
+import ApacheBeam
+
+func fixtureData(_ fixture: String) throws -> Data {
+    try Data(contentsOf: fixtureUrl(fixture))
+}
+
+
+func fixtureUrl(_ fixture: String) -> URL {
+    fixturesDirectory().appendingPathComponent(fixture)
+}
+
+
+func fixturesDirectory(path: String = #file) -> URL {
+    let url = URL(fileURLWithPath: path)
+    let testsDir = url.deletingLastPathComponent()
+    let res = testsDir.appendingPathComponent("Fixtures")
+    return res
+}
+
+final class IntegrationTests: XCTestCase {
+
+    override func setUpWithError() throws {
+    }
+
+    override func tearDownWithError() throws {
+    }
+
+    func testPortableWordcount() throws {
+        _ = Pipeline { pipeline in
+            let (contents,errors) = pipeline
+                .create(["file1.txt","file2.txt","missing.txt"])
+                .pardo { filenames,output,errors in
+                    for await (filename,_,_) in filenames {
+                        do {
+                            output.emit(String(decoding:try 
fixtureData(filename),as:UTF8.self))
+                        } catch {
+                            errors.emit("Unable to read \(filename): \(error)")
+                        }
+                    }
+                }
+            
+            // Simple ParDo that takes advantage of enumerateLines
+            let lines = contents.pardo { contents,lines in
+                for await (content,_,_) in contents {
+                    content.enumerateLines { line,_ in
+                        lines.emit(line)
+                    }
+                }
+            }
+            
+            // Our first group by operation
+            let baseCount = lines
+                .flatMap({ $0.components(separatedBy: .whitespaces) })
+                .groupBy({ ($0,1) })
+                .sum()
+            
+            let normalizedCounts = baseCount.groupBy {
+                ($0.key.lowercased().trimmingCharacters(in: 
.punctuationCharacters),
+                 $0.value ?? 1)
+            }.sum()
+            
+            
+            
+        }
+    }
+
+
+}


Reply via email to