This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c5edeac128926a905dbd8abe0461d8aad172b7d9
Author: Byron Ellis <byronel...@google.com>
AuthorDate: Mon Aug 21 13:20:18 2023 -0700

    Start stubbing out more of the composite transform implementation. Start 
stubbing out External Transforms and Schema support. Improve automatic naming 
of pardos and make explicit name implementation cleaner. Add S3 and GCS 
dependencies to implement basic internal IOs.
---
 sdks/swift/Package.resolved                        |  63 +++++++++++
 sdks/swift/Package.swift                           |   5 +-
 sdks/swift/Sources/ApacheBeam/Coders/Coder.swift   |   2 +-
 .../Core/PTransform/OutputPTransform.swift         |  10 ++
 .../Core/PTransform/PTransformBuilder.swift        |  20 ++++
 .../Core/PTransform/TuplePTransform.swift          |  20 ++++
 .../ApacheBeam/Core/Pipeline/Pipeline.swift        |   2 +
 .../Core/Pipeline/PipelineTransform.swift          |   1 +
 .../DynamicRow.swift}                              |  18 +---
 .../Sources/ApacheBeam/Schema/RowProtocol.swift    |   3 +
 sdks/swift/Sources/ApacheBeam/Schema/Schema.swift  |  21 ++++
 .../Sources/ApacheBeam/Transforms/Basic.swift      |  21 ++--
 .../Sources/ApacheBeam/Transforms/BuiltIn.swift    | 118 ++++++++++++++++-----
 .../Sources/ApacheBeam/Transforms/Combining.swift  |   2 +-
 .../Composite.swift}                               |  15 +--
 .../IO/ListFiles.swift}                            |  22 ++--
 .../Pipeline/IntegrationTests.swift                |   4 +-
 17 files changed, 265 insertions(+), 82 deletions(-)

diff --git a/sdks/swift/Package.resolved b/sdks/swift/Package.resolved
index cc4a106ff4a..7bf7847cda6 100644
--- a/sdks/swift/Package.resolved
+++ b/sdks/swift/Package.resolved
@@ -1,5 +1,50 @@
 {
   "pins" : [
+    {
+      "identity" : "aws-crt-swift",
+      "kind" : "remoteSourceControl",
+      "location" : "https://github.com/awslabs/aws-crt-swift";,
+      "state" : {
+        "revision" : "997904873945e074aaf5c51ea968d9a84684525a",
+        "version" : "0.13.0"
+      }
+    },
+    {
+      "identity" : "aws-sdk-swift",
+      "kind" : "remoteSourceControl",
+      "location" : "https://github.com/awslabs/aws-sdk-swift.git";,
+      "state" : {
+        "revision" : "14cfdb636d918e589f1b053e43acc118e75af5ce",
+        "version" : "0.23.0"
+      }
+    },
+    {
+      "identity" : "bigint",
+      "kind" : "remoteSourceControl",
+      "location" : "https://github.com/attaswift/BigInt";,
+      "state" : {
+        "revision" : "0ed110f7555c34ff468e72e1686e59721f2b0da6",
+        "version" : "5.3.0"
+      }
+    },
+    {
+      "identity" : "cryptoswift",
+      "kind" : "remoteSourceControl",
+      "location" : "https://github.com/krzyzanowskim/CryptoSwift.git";,
+      "state" : {
+        "revision" : "af1b58fc569bfde777462349b9f7314b61762be0",
+        "version" : "1.3.2"
+      }
+    },
+    {
+      "identity" : "google-auth-library-swift",
+      "kind" : "remoteSourceControl",
+      "location" : "https://github.com/googleapis/google-auth-library-swift";,
+      "state" : {
+        "revision" : "4b510d91fc74f1415eae6dabc9836b8c3e1f44f6",
+        "version" : "0.5.3"
+      }
+    },
     {
       "identity" : "grpc-swift",
       "kind" : "remoteSourceControl",
@@ -9,6 +54,15 @@
         "version" : "1.19.0"
       }
     },
+    {
+      "identity" : "smithy-swift",
+      "kind" : "remoteSourceControl",
+      "location" : "https://github.com/awslabs/smithy-swift";,
+      "state" : {
+        "revision" : "f277bd61b9a869a6a4a4b4a0e808240147713f7f",
+        "version" : "0.27.0"
+      }
+    },
     {
       "identity" : "swift-atomics",
       "kind" : "remoteSourceControl",
@@ -107,6 +161,15 @@
         "revision" : "ce20dc083ee485524b802669890291c0d8090170",
         "version" : "1.22.1"
       }
+    },
+    {
+      "identity" : "xmlcoder",
+      "kind" : "remoteSourceControl",
+      "location" : "https://github.com/MaxDesiatov/XMLCoder.git";,
+      "state" : {
+        "revision" : "80b4a1646399b8e4e0ce80711653476a85bd5e37",
+        "version" : "0.17.0"
+      }
     }
   ],
   "version" : 2
diff --git a/sdks/swift/Package.swift b/sdks/swift/Package.swift
index e6b67948631..c5ca4db14eb 100644
--- a/sdks/swift/Package.swift
+++ b/sdks/swift/Package.swift
@@ -27,6 +27,8 @@ let dependencies: [Package.Dependency] = [
     .package(url: "https://github.com/apple/swift-log.git";, from: "1.0.0"),
     
     // Additional Transform Dependencies
+    .package(url: "https://github.com/awslabs/aws-sdk-swift.git";, from: 
"0.23.0"),
+    .package(url: 
"https://github.com/googleapis/google-auth-library-swift",from:"0.0.0";),
     
     
     // Swift Package Manager Plugins
@@ -52,7 +54,8 @@ let package = Package(
             name: "ApacheBeam",
             dependencies: [
                 .product(name:"GRPC",package:"grpc-swift"),
-                .product(name: "Logging",package:"swift-log")
+                .product(name: "Logging",package:"swift-log"),
+                .product(name: "AWSS3",package:"aws-sdk-swift")
             ]
         ),
         .testTarget(
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift 
b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
index bf1ead4f6d8..13ed10ee6fa 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
@@ -32,8 +32,8 @@ public indirect enum Coder {
     case lengthprefix(Coder)
     case windowedvalue(Coder,Coder)
     
-
     // TODO: Row Coder
+    
 }
 
 
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift
new file mode 100644
index 00000000000..c9cb0b5c228
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift
@@ -0,0 +1,10 @@
+public struct OutputPTransform<Of> : _PrimitivePTransform {
+    let name: String
+    let applyClosure: (Any) -> PCollection<Of>
+
+    public init<In>(_ name:String,_ fn: @escaping (PCollection<In>) -> 
PCollection<Of>) {
+        self.name = name
+        self.applyClosure = { fn($0 as! PCollection<In>) }
+    }
+    
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift
new file mode 100644
index 00000000000..a1e2185c849
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift
@@ -0,0 +1,20 @@
+public struct EmptyPTransform : _PrimitivePTransform {
+    
+}
+
+
+
+
+@resultBuilder
+public struct PTransformBuilder {
+    
+    public static func buildBlock() -> EmptyPTransform {
+        EmptyPTransform()
+    }
+    
+    public static func buildBlock<Transform>(_ transform: Transform) -> 
Transform where Transform: PTransform {
+        transform
+    }
+    
+    
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PTransform/TuplePTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/TuplePTransform.swift
new file mode 100644
index 00000000000..49a26eefbbd
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/TuplePTransform.swift
@@ -0,0 +1,20 @@
+public struct TuplePTransform<T>: _PrimitivePTransform {
+    public let value: T
+    let _children: [AnyPTransform]
+    
+    public init(value: T){
+        self.value = value
+        _children = []
+    }
+    
+    public init(value: T,children: [AnyPTransform]){
+        self.value = value
+        _children = children
+    }
+    
+    init<T0:PTransform,T1:PTransform>(_ t0: T0,_ t1:T1) where T == (T0,T1) {
+        value = (t0,t1)
+        _children = [AnyPTransform(t0),AnyPTransform(t1)]
+    }
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
index b89025b7ec1..180710aae73 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
@@ -205,6 +205,8 @@ public final class Pipeline {
                             toVisit.append(.collection(o))
                         case .flatten(_, _):
                             throw ApacheBeamError.runtimeError("flatten not 
implemented yet")
+                        case .external:
+                            throw ApacheBeamError.runtimeError("External 
Transforms not implemented yet")
                         case .groupByKey(let o):
                             let outputs = [o].enumerated().map {
                                 ("\($0)",collection(from: $1).name)
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
index 550df2fb860..ce164566db1 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
@@ -27,6 +27,7 @@ public enum PipelineTransform {
     case groupByKey(AnyPCollection)
     case custom(String,Data,Environment?,[AnyPCollection])
     case composite(AnyPTransform)
+    case external
 }
 
 
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift
similarity index 63%
copy from sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
copy to sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift
index 550df2fb860..b0a4054d2e5 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
+++ b/sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift
@@ -16,18 +16,8 @@
  * limitations under the License.
  */
 
-import Foundation
-
-/// Enum for pipeline representable transforms as opposed to composite 
transforms
-/// which are a user-side construct represented by PTransform
-public enum PipelineTransform {
-    case pardo(String,SerializableFn,[AnyPCollection])
-    case impulse(AnyPCollection)
-    case flatten([AnyPCollection],AnyPCollection)
-    case groupByKey(AnyPCollection)
-    case custom(String,Data,Environment?,[AnyPCollection])
-    case composite(AnyPTransform)
+/// Dynamic representation of a Row that lets us treat a row value like JSON. 
Obviously this is less performant and
+/// when using schema objects internally, particularly in PCollections we 
would favor @Row structs
+public struct DynamicRow {
+    
 }
-
-
-
diff --git a/sdks/swift/Sources/ApacheBeam/Schema/RowProtocol.swift 
b/sdks/swift/Sources/ApacheBeam/Schema/RowProtocol.swift
new file mode 100644
index 00000000000..b1315fd09de
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Schema/RowProtocol.swift
@@ -0,0 +1,3 @@
+public protocol RowProtocol {
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift 
b/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift
new file mode 100644
index 00000000000..1404f5e8efd
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift
@@ -0,0 +1,21 @@
+
+public indirect enum FieldType {
+    case byte,int16,int32,int64,float,double,string,datetime,boolean,bytes
+    case decimal(Int,Int)
+    
+    case logical(Schema)
+    case row(Schema)
+    
+    case array(FieldType)
+    case repeated(FieldType)
+    case map(FieldType,FieldType)
+}
+
+public struct Field {
+    let name: String
+    let type: FieldType
+}
+
+public struct Schema {
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
index 9be08330b46..be7b3318c03 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
@@ -20,9 +20,8 @@
 public extension PCollection {
 
     /// Each time the input fires output all of the values in this list.
-    func create<Value:Codable>(_ values: [Value],_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> {
-        
-        return pardo(name,_file:_file,_line:_line,values) { 
values,input,output in
+    func create<Value:Codable>(_ values: [Value],name:String? = 
nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> {
+        return pardo(name:name ?? "\(_file):\(_line)",values) { 
values,input,output in
             for try await (_,ts,w) in input {
                 for v in values {
                     output.emit(v,timestamp:ts,window:w)
@@ -37,7 +36,7 @@ public extension PCollection {
     
     @discardableResult
     func log(prefix:String,_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Of> where Of == String 
{
-        pardo(name,_file:_file,_line:_line,prefix) { prefix,input,output in
+        pardo(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output in
             for await element in input {
                 print("\(prefix): \(element)")
                 output.emit(element)
@@ -47,7 +46,7 @@ public extension PCollection {
     
     @discardableResult
     func log<K,V>(prefix:String,_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line) -> PCollection<KV<K,V>> where Of == 
KV<K,V> {
-        pardo(name,_file:_file,_line:_line,prefix) { prefix,input,output in
+        pardo(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output in
             for await element in input {
                 let kv = element.0
                 for v in kv.values {
@@ -63,8 +62,8 @@ public extension PCollection {
 public extension PCollection {
     
     /// Modify a value without changing its window or timestamp
-    func map<Out>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ 
fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> {
-        return pardo(name,_file:_file,_line:_line) { input,output in
+    func map<Out>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ 
fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> {
+        return pardo(name:name ?? "\(_file):\(_line)") { input,output in
             for try await (v,ts,w) in input {
                 output.emit(fn(v),timestamp:ts,window:w)
             }
@@ -72,7 +71,7 @@ public extension PCollection {
     }
     
     func map<K,V>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ 
fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
-        return pardo(name,_file:_file,_line:_line) { input,output in
+        return pardo(name:name ?? "\(_file):\(_line)") { input,output in
             for try await (i,ts,w) in input {
                 let (key,value) = fn(i)
                 output.emit(KV(key,value),timestamp:ts,window:w)
@@ -82,7 +81,7 @@ public extension PCollection {
 
     /// Produce multiple outputs as a single value without modifying window or 
timestamp
     func flatMap<Out>(name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> 
[Out]) -> PCollection<Out> {
-        return pardo(name,_file:_file,_line:_line) { input,output in
+        return pardo(name:name ?? "\(_file):\(_line)") { input,output in
             for try await (v,ts,w) in input {
                 for i in fn(v) {
                     output.emit(i,timestamp:ts,window:w)
@@ -95,7 +94,7 @@ public extension PCollection {
 
 public extension PCollection<Never> {
     /// Convenience function to add an impulse when we are at the root of the 
pipeline
-    func create<Value:Codable>(_ values: [Value],_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> {
-        return impulse().create(values,name,_file:_file,_line:_line)
+    func create<Value:Codable>(_ values: [Value],name:String? = 
nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> {
+        return impulse().create(values,name:name,_file:_file,_line:_line)
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
index 4cc3532da21..7bea2c4f0c8 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
@@ -33,71 +33,137 @@ public extension PCollection {
     // TODO: Replace with parameter pack version once 
https://github.com/apple/swift/issues/67192 is resolved
     
     // No Output
-    func pardo<F:SerializableFn>(_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ fn: F) {
-        self.apply(.pardo(name ?? "\(_file):\(_line)", fn, []))
+    func pardo<F:SerializableFn>(name:String,_ fn: F) {
+        self.apply(.pardo(name, fn, []))
     }
-    func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: 
@Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) {
-        self.apply(.pardo(name ?? "\(_file):\(_line)", ClosureFn(fn),[]))
+    func pardo(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream) 
async throws -> Void) {
+        self.apply(.pardo(name, ClosureFn(fn),[]))
     }
-    func pardo<Param:Codable>(_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ param:Param,_ fn: @Sendable 
@escaping (Param,PCollection<Of>.Stream) async throws -> Void) {
-        self.apply(.pardo(name ?? "\(_file):\(_line)", 
ParameterizedClosureFn(param,fn), []))
+    func pardo<Param:Codable>(name:String,_ param:Param,_ fn: @Sendable 
@escaping (Param,PCollection<Of>.Stream) async throws -> Void) {
+        self.apply(.pardo(name, ParameterizedClosureFn(param,fn), []))
+    }
+    
+    // No Output Generated Names
+    func pardo<F:SerializableFn>(_file:String=#fileID,_line:Int=#line,_ fn: F) 
{
+        self.apply(.pardo("\(_file):\(_line)", fn, []))
+    }
+    func pardo(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping 
(PCollection<Of>.Stream) async throws -> Void) {
+        self.apply(.pardo("\(_file):\(_line)", ClosureFn(fn),[]))
+    }
+    func pardo<Param:Codable>(_file:String=#fileID,_line:Int=#line,_ 
param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async 
throws -> Void) {
+        self.apply(.pardo("\(_file):\(_line)", 
ParameterizedClosureFn(param,fn), []))
     }
 
 
     // Single Output
-    func pardo<F:SerializableFn,O0>(_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ fn: F,
+    func pardo<F:SerializableFn,O0>(name:String,fn: F,
+                                    _ o0:PCollection<O0>) {
+        self.apply(.pardo(name, fn, [AnyPCollection(o0)]))
+    }
+    func pardo<O0>(name:String,_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
+        let output = PCollection<O0>()
+        self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output)]))
+        return output
+    }
+    func pardo<Param:Codable,O0>(name:String,_ param: Param,_ fn: @Sendable 
@escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> 
Void) -> (PCollection<O0>) {
+        let output = PCollection<O0>()
+        
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
+        return output
+    }
+    
+    // Single Output Generated Names
+    func pardo<F:SerializableFn,O0>(_file:String=#fileID,_line:Int=#line,fn: F,
                                     _ o0:PCollection<O0>) {
-        self.apply(.pardo(name ?? "\(_file):\(_line)", fn, 
[AnyPCollection(o0)]))
+        self.apply(.pardo("\(_file):\(_line)", fn, [AnyPCollection(o0)]))
     }
-    func pardo<O0>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,
-                   _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
+    func pardo<O0>(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable 
@escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) 
-> (PCollection<O0>) {
         let output = PCollection<O0>()
-        self.apply(.pardo(name ?? 
"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)]))
+        
self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)]))
         return output
     }
-    func pardo<Param:Codable,O0>(_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ param: Param,
-                   _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
+    func pardo<Param:Codable,O0>(_file:String=#fileID,_line:Int=#line,_ param: 
Param,_ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
         let output = PCollection<O0>()
-        self.apply(.pardo(name ?? 
"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
+        
self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
         return output
     }
 
+    
+    
     // Two Outputs
-    func pardo<F:SerializableFn,O0,O1>(_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ fn: F,
+    func pardo<F:SerializableFn,O0,O1>(name:String,_ fn: F,
+                                    _ o0:PCollection<O0>,_ o1:PCollection<O1>) 
{
+        self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1)]))
+    }
+    func pardo<O0,O1>(name:String,
+                      _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async 
throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
+        let output = (PCollection<O0>(),PCollection<O1>())
+        
self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        return output
+    }
+    func pardo<Param:Codable,O0,O1>(name:String,_ param: Param,
+                                    _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) 
async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
+        let output = (PCollection<O0>(),PCollection<O1>())
+        
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        return output
+    }
+    
+    // Two Outputs Generated Names
+    func pardo<F:SerializableFn,O0,O1>(_file:String=#fileID,_line:Int=#line,_ 
fn: F,
                                     _ o0:PCollection<O0>,_ o1:PCollection<O1>) 
{
-        self.apply(.pardo(name ?? "\(_file):\(_line)", fn, 
[AnyPCollection(o0),AnyPCollection(o1)]))
+        self.apply(.pardo("\(_file):\(_line)", fn, 
[AnyPCollection(o0),AnyPCollection(o1)]))
     }
-    func pardo<O0,O1>(_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line,
+    func pardo<O0,O1>(_file:String=#fileID,_line:Int=#line,
                       _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async 
throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
         let output = (PCollection<O0>(),PCollection<O1>())
-        self.apply(.pardo(name ?? 
"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        
self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
         return output
     }
-    func pardo<Param:Codable,O0,O1>(_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ param: Param,
+    func pardo<Param:Codable,O0,O1>(_file:String=#fileID,_line:Int=#line,_ 
param: Param,
                                     _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) 
async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
         let output = (PCollection<O0>(),PCollection<O1>())
-        self.apply(.pardo(name ?? 
"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        
self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
         return output
     }
 
+    
+
     // Three Outputs
-    func pardo<F:SerializableFn,O0,O1,O2>(_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ fn: F,
+    func pardo<F:SerializableFn,O0,O1,O2>(name:String,_ fn: F,
+                                          _ o0:PCollection<O0>,_ 
o1:PCollection<O1>,_ o2:PCollection<O2>) {
+        self.apply(.pardo(name, fn, 
[AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
+    }
+    func pardo<O0,O1,O2>(name:String,
+                         _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
+        let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
+        
self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        return output
+    }
+    func pardo<Param:Codable,O0,O1,O2>(name:String,_ param: Param,
+                                       _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
+        let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
+        
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        return output
+    }
+    
+    // Three Outputs Generated Names
+    func 
pardo<F:SerializableFn,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ fn: F,
                                           _ o0:PCollection<O0>,_ 
o1:PCollection<O1>,_ o2:PCollection<O2>) {
-        self.apply(.pardo(name ?? "\(_file):\(_line)", fn, 
[AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
+        self.apply(.pardo("\(_file):\(_line)", fn, 
[AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
     }
-    func pardo<O0,O1,O2>(_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line,
+    func pardo<O0,O1,O2>(_file:String=#fileID,_line:Int=#line,
                          _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
         let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
-        self.apply(.pardo(name ?? 
"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        
self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
         return output
     }
-    func pardo<Param:Codable,O0,O1,O2>(_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ param: Param,
+    func pardo<Param:Codable,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ 
param: Param,
                                        _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
         let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
-        self.apply(.pardo(name ?? 
"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        
self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
         return output
     }
 
+
     //TODO: Add more as needed
 }
 
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
index 3ef57ebfd35..fc2aee794ec 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
@@ -19,7 +19,7 @@
 /// Basic reducers
 public extension PCollection {
     func reduce<Result:Codable,K,V>(name:String? = 
nil,_file:String=#fileID,_line:Int=#line,into:Result,_ accumulator: @Sendable 
@escaping (V,inout Result) -> Void) -> PCollection<KV<K,Result>> where Of == 
KV<K,V> {
-        return pardo(name,_file:_file,_line:_line,into) { 
initialValue,input,output in
+        return pardo(name:name ?? "\(_file):\(_line)",into) { 
initialValue,input,output in
             for await (kv,ts,w) in input {
                 var result = initialValue
                 for v in kv.values {
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Composite.swift
similarity index 63%
copy from sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
copy to sdks/swift/Sources/ApacheBeam/Transforms/Composite.swift
index 550df2fb860..9155d6ff9e5 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Composite.swift
@@ -16,18 +16,5 @@
  * limitations under the License.
  */
 
-import Foundation
-
-/// Enum for pipeline representable transforms as opposed to composite 
transforms
-/// which are a user-side construct represented by PTransform
-public enum PipelineTransform {
-    case pardo(String,SerializableFn,[AnyPCollection])
-    case impulse(AnyPCollection)
-    case flatten([AnyPCollection],AnyPCollection)
-    case groupByKey(AnyPCollection)
-    case custom(String,Data,Environment?,[AnyPCollection])
-    case composite(AnyPTransform)
+public extension PCollection {
 }
-
-
-
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift
similarity index 63%
copy from sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
copy to sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift
index 550df2fb860..2c6b79b6ee1 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift
@@ -16,18 +16,16 @@
  * limitations under the License.
  */
 
-import Foundation
-
-/// Enum for pipeline representable transforms as opposed to composite 
transforms
-/// which are a user-side construct represented by PTransform
-public enum PipelineTransform {
-    case pardo(String,SerializableFn,[AnyPCollection])
-    case impulse(AnyPCollection)
-    case flatten([AnyPCollection],AnyPCollection)
-    case groupByKey(AnyPCollection)
-    case custom(String,Data,Environment?,[AnyPCollection])
-    case composite(AnyPTransform)
+struct S3Bucket : Codable {
+    let bucket:String
 }
 
+struct GSBucket : Codable {
+    let bucket:String
+}
 
-
+public extension PCollection {
+    
+    
+    
+}
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
index 58c2baa479c..0c34fcb42ae 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
@@ -48,7 +48,7 @@ final class IntegrationTests: XCTestCase {
         try await Pipeline { pipeline in
             let (contents,errors) = pipeline
                 .create(["file1.txt","file2.txt","missing.txt"])
-                .pardo { filenames,output,errors in
+                .pardo(name:"Read Files") { filenames,output,errors in
                     for await (filename,_,_) in filenames {
                         do {
                             output.emit(String(decoding:try 
fixtureData(filename),as:UTF8.self))
@@ -58,7 +58,7 @@ final class IntegrationTests: XCTestCase {
                     }
                 }
             
-            // Simple ParDo that takes advantage of enumerateLines
+            // Simple ParDo that takes advantage of enumerateLines. No name to 
test name generation of pardos 
             let lines = contents.pardo { contents,lines in
                 for await (content,_,_) in contents {
                     content.enumerateLines { line,_ in

Reply via email to