This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 739ea5a644e22ca61f69cb7c23c2c67fffba80b8
Author: Byron Ellis <byronel...@google.com>
AuthorDate: Thu Aug 24 17:45:18 2023 -0700

    Rename previous stream-based pardo implementation to pstream and start to 
introduce an element-wise pardo. This better matches other SDK conventions 
(i.e. processElement) and allows us to implicitly transfer the timestamp and 
window from the input to an output. Repurposes the PInput and POutput protocols 
to handle this functionality. Also update the map and flatmap implementations 
to use pardo rather than pstream to get a sense of how well it works (well).
---
 sdks/swift/Package.swift                           |   1 +
 .../ApacheBeam/Core/DynamicProperties.swift        |  16 ---
 .../Sources/ApacheBeam/Core/Fn/ClosureFn.swift     |  10 +-
 .../Core/PCollection/AnyPCollection.swift          |   3 +-
 .../Core/PCollection/AnyPCollectionStream.swift    |   2 -
 .../ApacheBeam/Core/PCollection/PCollection.swift  |  11 ++-
 .../Core/PCollection/PCollectionStream.swift       |   5 +-
 .../Runtime/Bundle/BundleProcessor.swift           |   2 +
 .../Runtime/Metrics/MetricReporter.swift           |   4 +
 .../ApacheBeam/Testing/PCollection+Testing.swift   |  19 ++++
 .../PCollection => Testing}/PCollectionTest.swift  |   0
 .../Sources/ApacheBeam/Transforms/Basic.swift      |  23 ++---
 .../ApacheBeam/Transforms/BuiltIn+Elements.swift   | 109 +++++++++++++++++++++
 .../Sources/ApacheBeam/Transforms/BuiltIn.swift    |  96 +++++++-----------
 .../Sources/ApacheBeam/Transforms/Combining.swift  |   2 +-
 .../Transforms/IO/GoogleCloud/GoogleStorage.swift  |  12 +--
 .../Sources/Examples/Wordcount/Wordcount.swift     |   2 +
 .../Pipeline/CompositeIntegrationTests.swift       |  14 +--
 .../ApacheBeamTests/Pipeline/FileIOTests.swift     |   6 +-
 .../Pipeline/IntegrationTests.swift                |  14 +--
 20 files changed, 230 insertions(+), 121 deletions(-)

diff --git a/sdks/swift/Package.swift b/sdks/swift/Package.swift
index b25688b98b6..5d2771992d8 100644
--- a/sdks/swift/Package.swift
+++ b/sdks/swift/Package.swift
@@ -62,6 +62,7 @@ let package = Package(
                 .product(name: "OAuth2", package:"google-auth-library-swift")
             ]
         ),
+        .target(name:"Wordcount",dependencies: 
["ApacheBeam"],path:"Sources/Examples/Wordcount"),
         .testTarget(
             name: "ApacheBeamTests",
             dependencies: ["ApacheBeam"]),
diff --git a/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift 
b/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift
index 713661b7d8f..474353fcff9 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/DynamicProperties.swift
@@ -20,22 +20,6 @@ import Logging
 
 public protocol DynamicProperty { }
 
-@propertyWrapper
-public struct PInput<Of> : DynamicProperty {
-    public var wrappedValue: PCollectionStream<Of>
-    
-    public init(wrappedValue: PCollectionStream<Of> = .init()) {
-        self.wrappedValue = wrappedValue
-    }
-}
-
-@propertyWrapper
-public struct POutput<Of> : DynamicProperty {
-    public var wrappedValue: PCollectionStream<Of>
-    public init(wrappedValue: PCollectionStream<Of> = .init()) {
-        self.wrappedValue = wrappedValue
-    }
-}
 
 @propertyWrapper
 public struct RemoteLog : DynamicProperty {
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift
index fcf43a91215..c3f443f09ea 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/ClosureFn.swift
@@ -18,11 +18,13 @@
 
 import Foundation
 
+
+
 /// A SerializableFn that holds a reference to a function that takes a single 
input and produces a variable number of outputs
 public final class ClosureFn : SerializableFn {
     let processClosure: 
(SerializableFnBundleContext,[AnyPCollectionStream],[AnyPCollectionStream]) 
async throws -> Void
 
-    //TODO: Replace this with a parameter pack version once I figure out how 
to do that
+    
 
     public init<Of>(_ fn: @Sendable @escaping (PCollection<Of>.Stream) async 
throws -> Void) {
         self.processClosure = { context,inputs,outputs in
@@ -35,6 +37,7 @@ public final class ClosureFn : SerializableFn {
             try await fn(inputs[0].stream(),outputs[0].stream())
         }
     }
+    
 
     public init<Of,O0,O1>(_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async 
throws -> Void) {
         self.processClosure = { context,inputs,outputs in
@@ -42,17 +45,22 @@ public final class ClosureFn : SerializableFn {
         }
     }
     
+
+    
     public init<Of,O0,O1,O2>(_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) {
         self.processClosure = { context,inputs,outputs in
             try await 
fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream())
         }
     }
+    
+
 
     public init<Of,O0,O1,O2,O3>(_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream,PCollection<O3>.Stream)
 async throws -> Void) {
         self.processClosure = { context,inputs,outputs in
             try await 
fn(inputs[0].stream(),outputs[0].stream(),outputs[1].stream(),outputs[2].stream(),outputs[3].stream())
         }
     }
+
     
     public func process(context: SerializableFnBundleContext, inputs: 
[AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> 
(String, String) {
         try await processClosure(context,inputs,outputs)
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
index 48c6b39c2b2..e28af2185c2 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
@@ -71,10 +71,11 @@ public struct AnyPCollection : 
PCollectionProtocol,PipelineMember {
         fatalError("Do not use `stream` on AnyPCollection. Use `anyStream` 
instead.")
     }
     
+    
     public var anyStream: AnyPCollectionStream {
         streamClosure(collection)
     }
-    
+        
     var roots: [PCollection<Never>] {
         rootsClosure(collection)
     }
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
index 959d9a04217..9d252834b50 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
@@ -53,8 +53,6 @@ public struct AnyPCollectionStream : AsyncSequence {
                 try stream.emit(beamValue)
             } else if let element = $1 as? Element {
                 stream.emit((element.0 as! Of,element.1,element.2))
-            } else if let element = $1 as? Of {
-                stream.emit(element)
             } else {
                 throw ApacheBeamError.runtimeError("Unable to send \($1) to 
\(stream)")
             }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
index 9918ed69ace..dd9a2334923 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
@@ -36,15 +36,22 @@ public final class PCollection<Of> : PCollectionProtocol {
     public let coder: Coder
     public var consumers: [PipelineTransform]
     public private(set) var parent: PipelineTransform?
+
+    private let staticStream : PCollectionStream<Of>?
     
-    public init(coder: Coder = .of(type: Of.self)!,parent: PipelineTransform? 
= nil,consumers:[PipelineTransform] = []) {
+    public init(coder: Coder = .of(type: Of.self)!,
+                parent: PipelineTransform? = nil,
+                consumers:[PipelineTransform] = [],
+                stream:PCollectionStream<Of>? = nil
+    ) {
         self.coder = coder
         self.consumers = consumers
         self.parent = parent
+        self.staticStream = stream
     }
 
     public var stream: PCollectionStream<Of> {
-        return PCollectionStream<Of>()
+        return staticStream ?? PCollectionStream<Of>()
     }
     
     @discardableResult
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
index 38b8f795ac1..ba4ad0b26a5 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
@@ -18,6 +18,8 @@
 
 import Foundation
 
+
+
 /// The worker side realization of a PCollection that supports reading and 
writing
 public final class PCollectionStream<Of> : AsyncSequence {
     public typealias Element = (Of,Date,Window)
@@ -25,6 +27,7 @@ public final class PCollectionStream<Of> : AsyncSequence {
     private let stream: AsyncStream<Element>
     private let emitter: AsyncStream<Element>.Continuation
     
+    
     public init() {
         (self.stream,self.emitter) = AsyncStream.makeStream(of:Element.self)
     }
@@ -42,7 +45,7 @@ public final class PCollectionStream<Of> : AsyncSequence {
         emitter.yield(value)
     }
     
-    public func emit(_ value: Of,timestamp: Date = .now,window:Window = 
.global) {
+    public func emit(_ value: Of,timestamp: Date,window:Window) {
         emit((value,timestamp,window))
     }
     
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
index 8f1b2cac9de..4e977d6f112 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
@@ -42,6 +42,7 @@ struct BundleProcessor {
         var temp: [Step] = []
         var coders =  BundleCoderContainer(bundle:descriptor)
         
+        
         var streams: [String:AnyPCollectionStream] = [:]
         // First make streams for everything in this bundle (maybe I could use 
the pcollection array for this?)
         for (_,transform) in descriptor.transforms {
@@ -64,6 +65,7 @@ struct BundleProcessor {
             //Map the input and output streams in the correct order
             let inputs = transform.inputs.sorted().map { streams[$0.1]! }
             let outputs = transform.outputs.sorted().map { streams[$0.1]! }
+
             if urn == "beam:runner:source:v1" {
                 let remotePort = try RemoteGrpcPort(serializedData: 
transform.spec.payload)
                 let coder = try Coder.of(name: remotePort.coderID, in: coders)
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift
new file mode 100644
index 00000000000..9c2e5974716
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift
@@ -0,0 +1,4 @@
+/// Background actor that handles reporting metrics to the backend. Allows the 
metrics implementation to be asynchronous.
+actor MetricReporter {
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Testing/PCollection+Testing.swift 
b/sdks/swift/Sources/ApacheBeam/Testing/PCollection+Testing.swift
new file mode 100644
index 00000000000..5acdc111fca
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Testing/PCollection+Testing.swift
@@ -0,0 +1,19 @@
+import Foundation
+
+public extension PCollection {
+    
+    /// Create a PCollection whose stream has been preloaded with some values 
for testing
+    static func testValues<V:Beamable>(_ values:[V]) -> PCollection<V> {
+        let stream = PCollectionStream<V>()
+        for v in values {
+            stream.emit(v,timestamp:.now,window:.global)
+        }
+        return PCollection<V>(stream:stream)
+    }
+    
+    /// Convenience function that simulates an impulse
+    static func testImpulse() -> PCollection<Data> {
+        testValues([Data()])
+    }
+    
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionTest.swift 
b/sdks/swift/Sources/ApacheBeam/Testing/PCollectionTest.swift
similarity index 100%
rename from sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionTest.swift
rename to sdks/swift/Sources/ApacheBeam/Testing/PCollectionTest.swift
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
index af9e89b18bd..9da48003b6d 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
@@ -21,7 +21,7 @@ public extension PCollection {
 
     /// Each time the input fires output all of the values in this list.
     func create<Value:Codable>(_ values: [Value],name:String? = 
nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> {
-        return pardo(name:name ?? "\(_file):\(_line)",values) { 
values,input,output in
+        return pstream(name:name ?? "\(_file):\(_line)",values) { 
values,input,output in
             for try await (_,ts,w) in input {
                 for v in values {
                     output.emit(v,timestamp:ts,window:w)
@@ -36,7 +36,7 @@ public extension PCollection {
     
     @discardableResult
     func log(prefix:String,_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Of> where Of == String 
{
-        pardo(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output in
+        pstream(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output 
in
             for await element in input {
                 print("\(prefix): \(element)")
                 output.emit(element)
@@ -46,7 +46,7 @@ public extension PCollection {
     
     @discardableResult
     func log<K,V>(prefix:String,_ name:String? = 
nil,_file:String=#fileID,_line:Int=#line) -> PCollection<KV<K,V>> where Of == 
KV<K,V> {
-        pardo(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output in
+        pstream(name:name ?? "\(_file):\(_line)",prefix) { prefix,input,output 
in
             for await element in input {
                 let kv = element.0
                 for v in kv.values {
@@ -64,28 +64,23 @@ public extension PCollection {
     /// Modify a value without changing its window or timestamp
     func map<Out>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ 
fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> {
         return pardo(name:name ?? "\(_file):\(_line)") { input,output in
-            for try await (v,ts,w) in input {
-                output.emit(fn(v),timestamp:ts,window:w)
-            }
+            output.emit(fn(input.value))
         }
     }
     
+    /// Map function to convert a tuple pair into an encodable key-value pair
     func map<K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ 
fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
         return pardo(name:name ?? "\(_file):\(_line)") { input,output in
-            for try await (i,ts,w) in input {
-                let (key,value) = fn(i)
-                output.emit(KV(key,value),timestamp:ts,window:w)
-            }
+            let (key,value) = fn(input.value)
+            output.emit(KV(key,value))
         }
     }
 
     /// Produce multiple outputs as a single value without modifying window or 
timestamp
     func flatMap<Out>(name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> 
[Out]) -> PCollection<Out> {
         return pardo(name:name ?? "\(_file):\(_line)") { input,output in
-            for try await (v,ts,w) in input {
-                for i in fn(v) {
-                    output.emit(i,timestamp:ts,window:w)
-                }
+            for i in fn(input.value) {
+                output.emit(i)
             }
         }
     }
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift
new file mode 100644
index 00000000000..e29ea085b7d
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import Foundation
+
+public protocol PInput<Of> {
+    associatedtype Of
+    
+    var value: Of { get }
+    var timestamp: Date { get }
+    var window: Window { get }
+}
+
+public protocol POutput<Of> {
+    associatedtype Of
+    
+    func emit(_ value: Of,timestamp: Date,window: Window)
+    func emit(_ value: Of)
+    func emit(_ value: Of,timestamp: Date)
+    func emit(_ value: Of,window: Window)
+}
+
+
+struct PardoInput<Of> : PInput {
+    let value: Of
+    let timestamp: Date
+    let window: Window
+    public init(_ value:(Of,Date,Window)) {
+        self.value = value.0
+        self.timestamp = value.1
+        self.window = value.2
+    }
+}
+
+struct PardoOutput<Of> : POutput {
+    
+    
+    let stream: PCollectionStream<Of>
+    let timestamp: Date
+    let window: Window
+
+    func emit(_ value: Of, timestamp: Date, window: Window) {
+        stream.emit(value,timestamp:timestamp,window:window)
+    }
+    func emit(_ value: Of, timestamp: Date) {
+        stream.emit(value,timestamp:timestamp,window:window)
+
+    }    
+    func emit(_ value: Of, window: Window) {
+        stream.emit(value,timestamp:timestamp,window:window)
+
+    }
+    func emit(_ value: Of) {
+        stream.emit(value,timestamp:timestamp,window:window)
+    }
+
+    
+}
+
+public extension PCollection {
+    
+    // No Output
+    func pardo(name:String,_ fn: @Sendable @escaping (any PInput<Of>) async 
throws -> Void) {
+        pstream(name:name) { input in
+            for try await element in input {
+                try await fn(PardoInput(element))
+            }
+        }
+    }
+    
+    // One Output
+    func pardo<O0>(name:String,_ fn: @Sendable @escaping (any PInput<Of>,any 
POutput<O0>) async throws -> Void) -> PCollection<O0> {
+        pstream(name:name) { input,output in
+            for try await element in input {
+                try await 
fn(PardoInput(element),PardoOutput(stream:output,timestamp:element.1,window:element.2))
+            }
+        }
+    }
+    
+    //Two Outputs
+    func pardo<O0,O1>(name:String,_ fn: @Sendable @escaping (any 
PInput<Of>,any POutput<O0>,any POutput<O1>) async throws -> Void) -> 
(PCollection<O0>,PCollection<O1>) {
+        pstream(name:name) { input,o0,o1 in
+            for try await element in input {
+                try await fn(PardoInput(element),
+                             
PardoOutput(stream:o0,timestamp:element.1,window:element.2),
+                             
PardoOutput(stream:o1,timestamp:element.1,window:element.2)
+                             
+                )
+            }
+        }
+    }
+
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
index d33a576c42c..65bbe10389e 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
@@ -33,68 +33,64 @@ public extension PCollection {
     // TODO: Replace with parameter pack version once 
https://github.com/apple/swift/issues/67192 is resolved
     
     // No Output
-    func pardo<F:SerializableFn>(name:String,_ fn: F) {
+    func pstream<F:SerializableFn>(name:String,_ fn: F) {
         self.apply(.pardo(AnyPCollection(self),name, fn, []))
     }
-    func pardo(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream) 
async throws -> Void) {
+    func pstream(name:String,_ fn: @Sendable @escaping 
(PCollection<Of>.Stream) async throws -> Void) {
         self.apply(.pardo(AnyPCollection(self),name, ClosureFn(fn),[]))
     }
-    func pardo<Param:Codable>(name:String,_ param:Param,_ fn: @Sendable 
@escaping (Param,PCollection<Of>.Stream) async throws -> Void) {
+    func pstream<Param:Codable>(name:String,_ param:Param,_ fn: @Sendable 
@escaping (Param,PCollection<Of>.Stream) async throws -> Void) {
         self.apply(.pardo(AnyPCollection(self),name, 
ParameterizedClosureFn(param,fn), []))
     }
     
     // No Output Generated Names
-    func pardo<F:SerializableFn>(_file:String=#fileID,_line:Int=#line,_ fn: F) 
{
-        self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, []))
+    func pstream<F:SerializableFn>(_file:String=#fileID,_line:Int=#line,_ fn: 
F) {
+        pstream(name:"\(_file):\(_line)",fn)
     }
-    func pardo(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping 
(PCollection<Of>.Stream) async throws -> Void) {
-        self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", 
ClosureFn(fn),[]))
+    func pstream(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable 
@escaping (PCollection<Of>.Stream) async throws -> Void) {
+        pstream(name:"\(_file):\(_line)",fn)
     }
-    func pardo<Param:Codable>(_file:String=#fileID,_line:Int=#line,_ 
param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async 
throws -> Void) {
-        self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", 
ParameterizedClosureFn(param,fn), []))
+    func pstream<Param:Codable>(_file:String=#fileID,_line:Int=#line,_ 
param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async 
throws -> Void) {
+        pstream(name:"\(_file):\(_line)",param,fn)
     }
 
 
     // Single Output
-    func pardo<F:SerializableFn,O0>(name:String,fn: F,
+    func pstream<F:SerializableFn,O0>(name:String,_ fn: F,
                                     _ o0:PCollection<O0>) {
         self.apply(.pardo(AnyPCollection(self),name, fn, [AnyPCollection(o0)]))
     }
-    func pardo<O0>(name:String,_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
+    func pstream<O0>(name:String,_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
         let output = PCollection<O0>()
         
self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output)]))
         return output
     }
-    func pardo<Param:Codable,O0>(name:String,_ param: Param,_ fn: @Sendable 
@escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> 
Void) -> (PCollection<O0>) {
+    func pstream<Param:Codable,O0>(name:String,_ param: Param,_ fn: @Sendable 
@escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> 
Void) -> (PCollection<O0>) {
         let output = PCollection<O0>()
         
self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
         return output
     }
     
     // Single Output Generated Names
-    func pardo<F:SerializableFn,O0>(_file:String=#fileID,_line:Int=#line,fn: F,
+    func pstream<F:SerializableFn,O0>(_file:String=#fileID,_line:Int=#line,fn: 
F,
                                     _ o0:PCollection<O0>) {
-        self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, 
[AnyPCollection(o0)]))
+        pstream(name:"\(_file):\(_line)",fn,o0)
     }
-    func pardo<O0>(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable 
@escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) 
-> (PCollection<O0>) {
-        let output = PCollection<O0>()
-        
output.parent(self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)])))
-        return output
+    func pstream<O0>(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable 
@escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) 
-> (PCollection<O0>) {
+        pstream(name:"\(_file):\(_line)",fn)
     }
-    func pardo<Param:Codable,O0>(_file:String=#fileID,_line:Int=#line,_ param: 
Param,_ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
-        let output = PCollection<O0>()
-        
output.parent(self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)])))
-        return output
+    func pstream<Param:Codable,O0>(_file:String=#fileID,_line:Int=#line,_ 
param: Param,_ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
+        pstream(name:"\(_file):\(_line)",param,fn)
     }
 
     
     
     // Two Outputs
-    func pardo<F:SerializableFn,O0,O1>(name:String,_ fn: F,
+    func pstream<F:SerializableFn,O0,O1>(name:String,_ fn: F,
                                     _ o0:PCollection<O0>,_ o1:PCollection<O1>) 
{
         self.apply(.pardo(AnyPCollection(self),name, fn, 
[AnyPCollection(o0),AnyPCollection(o1)]))
     }
-    func pardo<O0,O1>(name:String,
+    func pstream<O0,O1>(name:String,
                       _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async 
throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
         let output = (PCollection<O0>(),PCollection<O1>())
         let parent = 
self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
@@ -102,7 +98,7 @@ public extension PCollection {
         output.1.parent(parent)
         return output
     }
-    func pardo<Param:Codable,O0,O1>(name:String,_ param: Param,
+    func pstream<Param:Codable,O0,O1>(name:String,_ param: Param,
                                     _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) 
async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
         let output = (PCollection<O0>(),PCollection<O1>())
         let parent = 
self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
@@ -112,37 +108,27 @@ public extension PCollection {
     }
     
     // Two Outputs Generated Names
-    func pardo<F:SerializableFn,O0,O1>(_file:String=#fileID,_line:Int=#line,_ 
fn: F,
+    func 
pstream<F:SerializableFn,O0,O1>(_file:String=#fileID,_line:Int=#line,_ fn: F,
                                     _ o0:PCollection<O0>,_ o1:PCollection<O1>) 
{
-        let parent = 
self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, 
[AnyPCollection(o0),AnyPCollection(o1)]))
-        o0.parent(parent)
-        o1.parent(parent)
+        pstream(name:"\(_file):\(_line)",fn,o0,o1)
     }
-    func pardo<O0,O1>(_file:String=#fileID,_line:Int=#line,
+    func pstream<O0,O1>(_file:String=#fileID,_line:Int=#line,
                       _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async 
throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
-        let output = (PCollection<O0>(),PCollection<O1>())
-        let parent = 
self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
-        output.0.parent(parent)
-        output.1.parent(parent)
-        return output
+        pstream(name:"\(_file):\(_line)",fn)
     }
-    func pardo<Param:Codable,O0,O1>(_file:String=#fileID,_line:Int=#line,_ 
param: Param,
+    func pstream<Param:Codable,O0,O1>(_file:String=#fileID,_line:Int=#line,_ 
param: Param,
                                     _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) 
async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
-        let output = (PCollection<O0>(),PCollection<O1>())
-        let parent = 
self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
-        output.0.parent(parent)
-        output.1.parent(parent)
-        return output
+        pstream(name:"\(_file):\(_line)",param,fn)
     }
 
     
 
     // Three Outputs
-    func pardo<F:SerializableFn,O0,O1,O2>(name:String,_ fn: F,
+    func pstream<F:SerializableFn,O0,O1,O2>(name:String,_ fn: F,
                                           _ o0:PCollection<O0>,_ 
o1:PCollection<O1>,_ o2:PCollection<O2>) {
         self.apply(.pardo(AnyPCollection(self),name, fn, 
[AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
     }
-    func pardo<O0,O1,O2>(name:String,
+    func pstream<O0,O1,O2>(name:String,
                          _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
         let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
         let parent = 
self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
@@ -151,7 +137,7 @@ public extension PCollection {
         output.2.parent(parent)
         return output
     }
-    func pardo<Param:Codable,O0,O1,O2>(name:String,_ param: Param,
+    func pstream<Param:Codable,O0,O1,O2>(name:String,_ param: Param,
                                        _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
         let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
         let parent = 
self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
@@ -162,27 +148,17 @@ public extension PCollection {
     }
     
     // Three Outputs Generated Names
-    func 
pardo<F:SerializableFn,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ fn: F,
+    func 
pstream<F:SerializableFn,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ fn: F,
                                           _ o0:PCollection<O0>,_ 
o1:PCollection<O1>,_ o2:PCollection<O2>) {
-        self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, 
[AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
+        pstream(name:"\(_file):\(_line)",fn,o0,o1,o2)
     }
-    func pardo<O0,O1,O2>(_file:String=#fileID,_line:Int=#line,
+    func pstream<O0,O1,O2>(_file:String=#fileID,_line:Int=#line,
                          _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
-        let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
-        let parent = 
self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
-        output.0.parent(parent)
-        output.1.parent(parent)
-        output.2.parent(parent)
-        return output
+        pstream(name:"\(_file):\(_line)",fn)
     }
-    func pardo<Param:Codable,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ 
param: Param,
+    func 
pstream<Param:Codable,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ param: 
Param,
                                        _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
-        let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
-        let parent = 
self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
-        output.0.parent(parent)
-        output.1.parent(parent)
-        output.2.parent(parent)
-        return output
+        pstream(name:"\(_file):\(_line)",param,fn)
     }
 
 
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
index fc2aee794ec..7abd847b8b5 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
@@ -19,7 +19,7 @@
 /// Basic reducers
 public extension PCollection {
     func reduce<Result:Codable,K,V>(name:String? = 
nil,_file:String=#fileID,_line:Int=#line,into:Result,_ accumulator: @Sendable 
@escaping (V,inout Result) -> Void) -> PCollection<KV<K,Result>> where Of == 
KV<K,V> {
-        return pardo(name:name ?? "\(_file):\(_line)",into) { 
initialValue,input,output in
+        return pstream(name:name ?? "\(_file):\(_line)",into) { 
initialValue,input,output in
             for await (kv,ts,w) in input {
                 var result = initialValue
                 for v in kv.values {
diff --git 
a/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift
index 444f8e40320..3ce56a709b2 100644
--- 
a/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift
+++ 
b/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift
@@ -36,12 +36,12 @@ struct ListFilesResponse: Codable {
 
 public struct GoogleStorage : FileIOSource {
     public static func readFiles(matching: PCollection<KV<String,String>>) -> 
PCollection<Data> {
-        matching.pardo { matching,output in
+        matching.pstream { matching,output in
             guard let tokenProvider = DefaultTokenProvider(scopes: 
["storage.objects.get"]) else {
                 throw ApacheBeamError.runtimeError("Unable to get OAuth2 
token.")
             }
             let connection = Connection(provider: tokenProvider)
-            for await (file,_,_) in matching {
+            for await (file,ts,w) in matching {
                 let bucket = file.key
                 for name in file.values {
                     let url = 
"https://storage.googleapis.com/storage/v1/b/\(bucket)/o/\(name.addingPercentEncoding(withAllowedCharacters:
 .urlHostAllowed)!)"
@@ -62,7 +62,7 @@ public struct GoogleStorage : FileIOSource {
                         }
                     }
                     if let d = response {
-                        output.emit(d)
+                        output.emit(d,timestamp:ts,window:w)
                     }
                 }
             }
@@ -70,13 +70,13 @@ public struct GoogleStorage : FileIOSource {
     }
     
     public static func listFiles(matching: PCollection<KV<String,String>>) -> 
PCollection<KV<String,String>> {
-        matching.pardo { matching,output in
+        matching.pstream { matching,output in
             
             guard let tokenProvider = DefaultTokenProvider(scopes: 
["storage.objects.list"]) else {
                 throw ApacheBeamError.runtimeError("Unable to get OAuth2 
token.")
             }
             let connection = Connection(provider: tokenProvider)
-            for await (match,_,_) in matching {
+            for await (match,ts,w) in matching {
                 let bucket = match.key
                 for prefix in match.values {
                     let response:Data? = try await 
withCheckedThrowingContinuation { continuation in
@@ -100,7 +100,7 @@ public struct GoogleStorage : FileIOSource {
                         let listfiles = try 
JSONDecoder().decode(ListFilesResponse.self, from: data)
                         for item in listfiles.items {
                             if item.size != "0" {
-                                output.emit(KV(item.bucket,item.name))
+                                
output.emit(KV(item.bucket,item.name),timestamp: ts,window: w)
                             }
                         }
                     }
diff --git a/sdks/swift/Sources/Examples/Wordcount/Wordcount.swift 
b/sdks/swift/Sources/Examples/Wordcount/Wordcount.swift
new file mode 100644
index 00000000000..3778545046f
--- /dev/null
+++ b/sdks/swift/Sources/Examples/Wordcount/Wordcount.swift
@@ -0,0 +1,2 @@
+import ApacheBeam
+
diff --git 
a/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift
index bddf3abc47d..dd2d184ac71 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift
@@ -19,20 +19,20 @@ public struct FixtureWordCount : PTransform {
     public var expand: some PTransform {
         
         let (contents,errors) = create(self.fixtures)
-            .pardo(name:"Read Files") { 
(filenames,output:PCollectionStream<String>,errors:PCollectionStream<String>) in
-            for await (filename,_,_) in filenames {
+            .pstream(name:"Read Files") { 
(filenames,output:PCollectionStream<String>,errors:PCollectionStream<String>) in
+            for await (filename,ts,w) in filenames {
                 do {
-                    output.emit(String(decoding:try 
fixtureData(filename),as:UTF8.self))
+                    output.emit(String(decoding:try 
fixtureData(filename),as:UTF8.self),timestamp:ts,window:w)
                 } catch {
-                    errors.emit("Unable to read \(filename): \(error)")
+                    errors.emit("Unable to read \(filename): 
\(error)",timestamp:ts,window:w)
                 }
             }
         }
         
-        let baseCount = contents.pardo { 
(contents,lines:PCollectionStream<String>) in
-            for await (content,_,_) in contents {
+        let baseCount = contents.pstream { 
(contents,lines:PCollectionStream<String>) in
+            for await (content,ts,w) in contents {
                 content.enumerateLines { line,_ in
-                    lines.emit(line)
+                    lines.emit(line,timestamp:ts,window:w)
                 }
             }
         }
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
index 1806de4c098..b4ffa125b08 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
@@ -54,10 +54,10 @@ final class FileIOTests: XCTestCase {
                 .readFiles(in: GoogleStorage.self)
             
             // Simple ParDo that takes advantage of enumerateLines. No name to 
test name generation of pardos
-            let lines = contents.pardo { contents,lines in
-                for await (content,_,_) in contents {
+            let lines = contents.pstream { contents,lines in
+                for await (content,ts,w) in contents {
                     String(data:content,encoding:.utf8)!.enumerateLines { 
line,_ in
-                        lines.emit(line)
+                        lines.emit(line,timestamp:ts,window:w)
                     }
                 }
             }
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
index b86b941cd7f..27e5972c1e7 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
@@ -49,21 +49,21 @@ final class IntegrationTests: XCTestCase {
         try await Pipeline { pipeline in
             let (contents,errors) = pipeline
                 .create(["file1.txt","file2.txt","missing.txt"])
-                .pardo(name:"Read Files") { filenames,output,errors in
-                    for await (filename,_,_) in filenames {
+                .pstream(name:"Read Files") { filenames,output,errors in
+                    for await (filename,ts,w) in filenames {
                         do {
-                            output.emit(String(decoding:try 
fixtureData(filename),as:UTF8.self))
+                            output.emit(String(decoding:try 
fixtureData(filename),as:UTF8.self),timestamp:ts,window:w)
                         } catch {
-                            errors.emit("Unable to read \(filename): \(error)")
+                            errors.emit("Unable to read \(filename): 
\(error)",timestamp:ts,window:w)
                         }
                     }
                 }
             
             // Simple ParDo that takes advantage of enumerateLines. No name to 
test name generation of pardos 
-            let lines = contents.pardo { contents,lines in
-                for await (content,_,_) in contents {
+            let lines = contents.pstream { contents,lines in
+                for await (content,ts,w) in contents {
                     content.enumerateLines { line,_ in
-                        lines.emit(line)
+                        lines.emit(line,timestamp:ts,window:w)
                     }
                 }
             }


Reply via email to