This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0bfcf41f45520f4382314e05c97af123ca549201
Author: Byron Ellis <byronel...@google.com>
AuthorDate: Wed Aug 23 18:55:18 2023 -0700

    Added the primitives for a FileIO implementation with Google Storage 
support to start. Doesn't do anything fancy, but does implement a true version 
of wordcount with the classic Shakespeare input.
---
 sdks/swift/Package.resolved                        |   9 ++
 sdks/swift/Package.swift                           |  10 +-
 .../Core/PCollection/AnyPCollection.swift          |   8 +-
 .../Core/PCollection/AnyPCollectionStream.swift    |   6 +-
 .../ApacheBeam/Core/PCollection/PCollection.swift  |  15 +++
 .../Core/PCollection/PCollectionTest.swift         |  43 ++++++++
 .../PTransform/{Outputs.swift => Output.swift}     |   2 +-
 .../ApacheBeam/Core/PTransform/Transform.swift     |   7 ++
 .../ApacheBeam/Core/Pipeline/Pipeline.swift        |   4 +
 .../Core/Pipeline/PipelineTransform.swift          |  24 ++++-
 .../Schema/{DynamicRow.swift => Row.swift}         |   9 +-
 sdks/swift/Sources/ApacheBeam/Schema/Schema.swift  |   1 +
 .../Sources/ApacheBeam/Transforms/IO/FileIO.swift  |  17 ++++
 .../Transforms/IO/GoogleCloud/GoogleStorage.swift  | 113 +++++++++++++++++++++
 .../ApacheBeam/Transforms/IO/ListFiles.swift       |  43 --------
 .../Pipeline/CompositeIntegrationTests.swift       |   1 +
 .../ApacheBeamTests/Pipeline/FileIOTests.swift     |  83 +++++++++++++++
 .../Pipeline/IntegrationTests.swift                |   1 +
 18 files changed, 342 insertions(+), 54 deletions(-)

diff --git a/sdks/swift/Package.resolved b/sdks/swift/Package.resolved
index 1a12c374c0c..6aa79883fc0 100644
--- a/sdks/swift/Package.resolved
+++ b/sdks/swift/Package.resolved
@@ -63,6 +63,15 @@
         "version" : "1.19.0"
       }
     },
+    {
+      "identity" : "pythonkit",
+      "kind" : "remoteSourceControl",
+      "location" : "https://github.com/pvieito/PythonKit.git";,
+      "state" : {
+        "branch" : "master",
+        "revision" : "060e1c8b0d14e4d241b3623fdbe83d0e3c81a993"
+      }
+    },
     {
       "identity" : "smithy-swift",
       "kind" : "remoteSourceControl",
diff --git a/sdks/swift/Package.swift b/sdks/swift/Package.swift
index 54cf8c5d88c..b25688b98b6 100644
--- a/sdks/swift/Package.swift
+++ b/sdks/swift/Package.swift
@@ -30,8 +30,8 @@ let dependencies: [Package.Dependency] = [
     .package(url: "https://github.com/awslabs/aws-sdk-swift.git";, from: 
"0.23.0"),
     .package(url: 
"https://github.com/googleapis/google-auth-library-swift",from:"0.0.0";),
     .package(url: "https://github.com/duckdb/duckdb-swift";, 
.upToNextMinor(from: .init(0, 8, 0))),
-    
-    
+    .package(url: "https://github.com/pvieito/PythonKit.git";, branch: 
"master"),
+
     // Swift Package Manager Plugins
     .package(url: "https://github.com/apple/swift-docc-plugin";, from: "1.0.0")
 ]
@@ -54,10 +54,12 @@ let package = Package(
         .target(
             name: "ApacheBeam",
             dependencies: [
-                .product(name:"GRPC",package:"grpc-swift"),
+                .product(name: "GRPC",package:"grpc-swift"),
                 .product(name: "Logging",package:"swift-log"),
                 .product(name: "AWSS3",package:"aws-sdk-swift"),
-                .product(name: "DuckDB", package: "duckdb-swift")
+                .product(name: "DuckDB", package: "duckdb-swift"),
+                .product(name: "PythonKit", package:"PythonKit"),
+                .product(name: "OAuth2", package:"google-auth-library-swift")
             ]
         ),
         .testTarget(
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
index 0b67e6068d8..48c6b39c2b2 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-public struct AnyPCollection : PCollectionProtocol {
+public struct AnyPCollection : PCollectionProtocol,PipelineMember {
     
     
     
@@ -29,6 +29,7 @@ public struct AnyPCollection : PCollectionProtocol {
     let consumersClosure: (Any) -> [PipelineTransform]
     let coderClosure: (Any) -> Coder
     let streamClosure: (Any) -> AnyPCollectionStream
+    let rootsClosure: (Any) -> [PCollection<Never>]
     
     public init<C>(_ collection: C) where C : PCollectionProtocol {
         if let anyCollection = collection as? AnyPCollection {
@@ -43,6 +44,7 @@ public struct AnyPCollection : PCollectionProtocol {
             self.coderClosure = { ($0 as! C).coder }
             self.streamClosure = { AnyPCollectionStream(($0 as! C).stream) }
             self.parentClosure = { ($0 as! C).parent }
+            self.rootsClosure = { ($0 as! PipelineMember).roots }
         }
     }
     
@@ -73,6 +75,10 @@ public struct AnyPCollection : PCollectionProtocol {
         streamClosure(collection)
     }
     
+    var roots: [PCollection<Never>] {
+        rootsClosure(collection)
+    }
+    
 }
 
 extension AnyPCollection : Hashable {
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
index 4e5ea0bfb0c..959d9a04217 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
@@ -53,9 +53,11 @@ public struct AnyPCollectionStream : AsyncSequence {
                 try stream.emit(beamValue)
             } else if let element = $1 as? Element {
                 stream.emit((element.0 as! Of,element.1,element.2))
-            } else if let element = $1 as? PCollectionStream<Of>.Element {
+            } else if let element = $1 as? Of {
                 stream.emit(element)
-            } 
+            } else {
+                throw ApacheBeamError.runtimeError("Unable to send \($1) to 
\(stream)")
+            }
         }
         
         self.finishClosure = {
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
index b8cf712be13..9918ed69ace 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
@@ -59,3 +59,18 @@ public final class PCollection<Of> : PCollectionProtocol {
     
     
 }
+
+extension PCollection : PipelineMember {
+    var roots: [PCollection<Never>] { 
+        if let p = parent {
+            return p.roots
+        } else if let p = self as? PCollection<Never> {
+            return [p]
+        } else {
+            return []
+        }
+    }
+    
+    
+}
+
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionTest.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionTest.swift
new file mode 100644
index 00000000000..221ea8bdb0a
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionTest.swift
@@ -0,0 +1,43 @@
+import Logging
+
+/// Test harness for PCollections. Primarily designed for unit testing.
+public struct PCollectionTest {
+    let output: AnyPCollection
+    let fn: (Logger,[AnyPCollectionStream],[AnyPCollectionStream]) async 
throws -> Void
+    
+    public init<Of>(_ collection: PCollection<Of>,_ fn: @escaping 
(Logger,[AnyPCollectionStream],[AnyPCollectionStream]) async throws -> Void) {
+        self.output = AnyPCollection(collection)
+        self.fn = fn
+    }
+    
+    public func run() async throws {
+        let log = Logger(label:"Test")
+        if let transform = output.parent {
+            switch transform {
+            case let .pardo(parent,_,fn,outputs):
+                let context = 
SerializableFnBundleContext(instruction:"1",transform:"test",payload:try 
fn.payload,log: log)
+                let input = parent.anyStream
+                let streams = outputs.map({$0.anyStream})
+                
+                try await withThrowingTaskGroup(of: Void.self) { group in
+                    log.info("Starting process task")
+                    group.addTask {
+                        _ = try await fn.process(context: context, 
inputs:[input], outputs: streams)
+                    }
+                    log.info("Calling Stream")
+                    group.addTask {
+                        try await self.fn(log,[input],streams)
+                    }
+                    for try await _ in group {
+                    }
+                }
+            default:
+                throw ApacheBeamError.runtimeError("Not able to test this type 
of transform \(transform)")
+            }
+        } else {
+            throw ApacheBeamError.runtimeError("Unable to determine parent 
transform to test")
+        }
+    }
+    
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Output.swift
similarity index 85%
rename from sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift
rename to sdks/swift/Sources/ApacheBeam/Core/PTransform/Output.swift
index a0c246a438a..89c2d439ac9 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Output.swift
@@ -3,7 +3,7 @@ public struct NamedCollectionPTransform<Of> : 
_PrimitivePTransform {
     let collection: PCollection<Of>
 }
 
-/// Captures a single pcollection and gives it a name
+/// Captures a PCollection and gives it a name so it can be used as an output
 public struct Output<Of> : PTransform {
     let name: String
     let fn: () -> PCollection<Of>
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/Transform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Transform.swift
new file mode 100644
index 00000000000..f5cbc70570f
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Transform.swift
@@ -0,0 +1,7 @@
+/// Groups transforms together. Does not expose any of the pcollections are 
accessible outputs
+public struct Transform<Subtransform> {
+    let subtransform:Subtransform
+    public init(@PTransformBuilder subtransform: () -> Subtransform) {
+        self.subtransform = subtransform()
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
index b9cab1376a5..825680cfa4c 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
@@ -19,6 +19,10 @@
 import GRPC
 import Logging
 
+protocol PipelineMember {
+    var roots: [PCollection<Never>] { get }
+}
+
 public final class Pipeline {
     let content: (inout PCollection<Never>) -> Void
     let log: Logging.Logger
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
index 1ce5c5f6bdc..d7fca814b68 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
@@ -30,5 +30,27 @@ public enum PipelineTransform {
     case external(AnyPCollection,[AnyPCollection])
 }
 
-
+extension PipelineTransform : PipelineMember {
+    var roots: [PCollection<Never>] {
+        switch self {
+            
+        case .pardo(let p, _, _, _):
+            return p.roots
+        case .impulse(let p, _):
+            return p.roots
+        case .flatten(let p, _):
+            return p.flatMap({ $0.roots })
+        case .groupByKey(let p, _):
+            return p.roots
+        case .custom(let p, _, _, _, _):
+            return p.roots
+        case .composite(let p, _):
+            return p.roots
+        case .external(let p, _):
+            return p.roots
+        }
+    }
+    
+    
+}
 
diff --git a/sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift 
b/sdks/swift/Sources/ApacheBeam/Schema/Row.swift
similarity index 92%
rename from sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift
rename to sdks/swift/Sources/ApacheBeam/Schema/Row.swift
index b0a4054d2e5..f0991a94cfe 100644
--- a/sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift
+++ b/sdks/swift/Sources/ApacheBeam/Schema/Row.swift
@@ -16,8 +16,13 @@
  * limitations under the License.
  */
 
+import Foundation
+
 /// Dynamic representation of a Row that lets us treat a row value like JSON. 
Obviously this is less performant and
 /// when using schema objects internally, particularly in PCollections we 
would favor @Row structs
-public struct DynamicRow {
-    
+public struct Row {
+    let data: Data
+    let schema: Schema
 }
+
+
diff --git a/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift 
b/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift
index 1404f5e8efd..489c8ac98bb 100644
--- a/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift
+++ b/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift
@@ -13,6 +13,7 @@ public indirect enum FieldType {
 
 public struct Field {
     let name: String
+    let description: String?
     let type: FieldType
 }
 
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/IO/FileIO.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/IO/FileIO.swift
new file mode 100644
index 00000000000..fba04c58627
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/IO/FileIO.swift
@@ -0,0 +1,17 @@
+import Foundation
+
+public protocol FileIOSource {
+    static func readFiles(matching:PCollection<KV<String,String>>) -> 
PCollection<Data>
+    static func listFiles(matching:PCollection<KV<String,String>>) -> 
PCollection<KV<String,String>>
+}
+
+public extension PCollection<KV<String,String>> {
+    func readFiles<Source:FileIOSource>(in source:Source.Type) -> 
PCollection<Data> {
+        Source.readFiles(matching:self)
+    }
+    
+    /// Takes a KV pair of (bucket,prefix) and returns a list of 
(bucket,filename)
+    func listFiles<Source:FileIOSource>(in source:Source.Type) -> 
PCollection<KV<String,String>> {
+        Source.listFiles(matching:self)
+    }
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift
new file mode 100644
index 00000000000..444f8e40320
--- /dev/null
+++ 
b/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import OAuth2
+import Foundation
+import Dispatch
+
+struct ListFilesResponse: Codable {
+    struct Item : Codable {
+        let kind: String
+        let selfLink: String
+        let mediaLink: String
+        let name: String
+        let bucket: String
+        let size: String
+    }
+    
+    let kind: String
+    let items: [Item]
+}
+
+public struct GoogleStorage : FileIOSource {
+    public static func readFiles(matching: PCollection<KV<String,String>>) -> 
PCollection<Data> {
+        matching.pardo { matching,output in
+            guard let tokenProvider = DefaultTokenProvider(scopes: 
["storage.objects.get"]) else {
+                throw ApacheBeamError.runtimeError("Unable to get OAuth2 
token.")
+            }
+            let connection = Connection(provider: tokenProvider)
+            for await (file,_,_) in matching {
+                let bucket = file.key
+                for name in file.values {
+                    let url = 
"https://storage.googleapis.com/storage/v1/b/\(bucket)/o/\(name.addingPercentEncoding(withAllowedCharacters:
 .urlHostAllowed)!)"
+                    let response:Data? = try await 
withCheckedThrowingContinuation { continuation in
+                        do {
+                            try connection.performRequest(method: "GET",
+                                                          urlString: url ,
+                                                          parameters: 
["alt":"media"],body:nil) {
+                                data,response,error in
+                                if let e = error {
+                                    continuation.resume(throwing: e)
+                                } else {
+                                    continuation.resume(returning: data)
+                                }
+                            }
+                        } catch {
+                            continuation.resume(throwing: error)
+                        }
+                    }
+                    if let d = response {
+                        output.emit(d)
+                    }
+                }
+            }
+        }
+    }
+    
+    public static func listFiles(matching: PCollection<KV<String,String>>) -> 
PCollection<KV<String,String>> {
+        matching.pardo { matching,output in
+            
+            guard let tokenProvider = DefaultTokenProvider(scopes: 
["storage.objects.list"]) else {
+                throw ApacheBeamError.runtimeError("Unable to get OAuth2 
token.")
+            }
+            let connection = Connection(provider: tokenProvider)
+            for await (match,_,_) in matching {
+                let bucket = match.key
+                for prefix in match.values {
+                    let response:Data? = try await 
withCheckedThrowingContinuation { continuation in
+                        do {
+                            try connection.performRequest(
+                                method: "GET",
+                                urlString: 
"https://storage.googleapis.com/storage/v1/b/\(bucket)/o",
+                                parameters:["prefix": prefix],
+                                body:nil) { data,response,error in
+                                    if let e = error {
+                                        continuation.resume(throwing: e)
+                                    } else {
+                                        continuation.resume(returning: data)
+                                    }
+                                }
+                        } catch {
+                            continuation.resume(throwing: error)
+                        }
+                    }
+                    if let data = response {
+                        let listfiles = try 
JSONDecoder().decode(ListFilesResponse.self, from: data)
+                        for item in listfiles.items {
+                            if item.size != "0" {
+                                output.emit(KV(item.bucket,item.name))
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+    
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift
deleted file mode 100644
index 2ebb7f852d3..00000000000
--- a/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-struct S3Bucket : Codable {
-    let bucket:String
-}
-
-struct GSBucket : Codable {
-    let bucket:String
-}
-
-public extension PCollection {
-    
-    func listFiles(s3 bucket:String) {
-        
-    }
-    
-    func listFiles(gs bucket:String) {
-    }
-    
-}
-
-public func listFiles(s3 bucket: String) {
-}
-
-public func listFiles(gs bucket: String) {
-    
-}
diff --git 
a/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift
index f2ee142a966..bddf3abc47d 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift
@@ -66,6 +66,7 @@ final class CompositeIntegrationTests: XCTestCase {
     }
 
     func testCompositeWordCount() async throws {
+        throw XCTSkip()
         try await Pipeline {
             FixtureWordCount(fixtures: ["file1.txt","file2.txt","missing.txt"])
         }.run(PortableRunner(loopback:true))
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
new file mode 100644
index 00000000000..1806de4c098
--- /dev/null
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
@@ -0,0 +1,83 @@
+//
+//  FileIOTests.swift
+//  
+//
+//  Created by Byron Ellis on 8/22/23.
+//
+import ApacheBeam
+import Logging
+import XCTest
+
+final class FileIOTests: XCTestCase {
+
+    override func setUpWithError() throws {
+    }
+
+    override func tearDownWithError() throws {
+    }
+
+    func testGoogleStorageListFiles() async throws {
+        throw XCTSkip()
+        try await 
PCollectionTest(PCollection<KV<String,String>>().listFiles(in: 
GoogleStorage.self)) { log,inputs,outputs in
+            log.info("Sending value")
+            try inputs[0].emit(value:KV("dataflow-samples","shakespeare"))
+            log.info("Value sent")
+            inputs[0].finish()
+            for try await (output,_,_) in outputs[0] {
+                log.info("Output: \(output)")
+            }
+        }.run()
+    }
+
+    func testGoogleStorageReadFiles() async throws {
+        try await 
PCollectionTest(PCollection<KV<String,String>>().readFiles(in: 
GoogleStorage.self)) { log,inputs,outputs in
+            throw XCTSkip()
+            log.info("Sending value")
+            try 
inputs[0].emit(value:KV("dataflow-samples","shakespeare/asyoulikeit.txt"))
+            log.info("Value sent")
+            inputs[0].finish()
+            for try await (output,_,_) in outputs[0] {
+                log.info("Output: \(String(data:output as! 
Data,encoding:.utf8)!)")
+            }
+        }.run()
+    }
+
+    func testShakespeareWordcount() async throws {
+        try await Pipeline { pipeline in
+            let contents = pipeline
+                .create(["dataflow-samples/shakespeare"])
+                .map({ value in
+                    let parts = value.split(separator: "/",maxSplits: 1)
+                    return KV(parts[0].lowercased(),parts[1].lowercased())
+                })
+                .listFiles(in: GoogleStorage.self)
+                .readFiles(in: GoogleStorage.self)
+            
+            // Simple ParDo that takes advantage of enumerateLines. No name to 
test name generation of pardos
+            let lines = contents.pardo { contents,lines in
+                for await (content,_,_) in contents {
+                    String(data:content,encoding:.utf8)!.enumerateLines { 
line,_ in
+                        lines.emit(line)
+                    }
+                }
+            }
+            
+            // Our first group by operation
+            let baseCount = lines
+                .flatMap({ $0.components(separatedBy: .whitespaces) })
+                .groupBy({ ($0,1) })
+                .sum()
+                .log(prefix:"INTERMEDIATE OUTPUT")
+            
+            let normalizedCounts = baseCount.groupBy {
+                ($0.key.lowercased().trimmingCharacters(in: 
.punctuationCharacters),
+                 $0.value ?? 1)
+            }.sum()
+            
+            normalizedCounts.log(prefix:"COUNT OUTPUT")
+            
+        }.run(PortableRunner(loopback:true))
+    }
+    
+
+}
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
index 0c34fcb42ae..b86b941cd7f 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
@@ -45,6 +45,7 @@ final class IntegrationTests: XCTestCase {
     }
 
     func testPortableWordcount() async throws {
+        throw XCTSkip()
         try await Pipeline { pipeline in
             let (contents,errors) = pipeline
                 .create(["file1.txt","file2.txt","missing.txt"])

Reply via email to