This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f8a4b2d177 [yaml] Add examples for Spanner IO in YAML (#32288)
9f8a4b2d177 is described below

commit 9f8a4b2d177c6308da112411c976b8ed1425e412
Author: Reeba Qureshi <64488642+reeba...@users.noreply.github.com>
AuthorDate: Tue Sep 17 02:12:51 2024 +0600

    [yaml] Add examples for Spanner IO in YAML (#32288)
    
    * Add example for spanner read
    
    * Add example for spanner write
    
    * move spanner examples
    
    * minor update
    
    * minor changes
    
    1. Add good element in spanner write example to pass checks.
    2. Remove spanner examples from examples_test.py for the time being.
    
    * add license
---
 .../apache_beam/yaml/examples/io/spanner_read.yaml | 80 ++++++++++++++++++++++
 .../yaml/examples/io/spanner_write.yaml            | 53 ++++++++++++++
 2 files changed, 133 insertions(+)

diff --git a/sdks/python/apache_beam/yaml/examples/io/spanner_read.yaml 
b/sdks/python/apache_beam/yaml/examples/io/spanner_read.yaml
new file mode 100644
index 00000000000..c86d42c1e0c
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/io/spanner_read.yaml
@@ -0,0 +1,80 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pipeline:
+  transforms:
+
+  # Reading data from a Spanner database. The table used here has the 
following columns:
+  # shipment_id (String), customer_id (String), shipment_date (String), 
shipment_cost (Float64), customer_name (String), customer_email (String)
+  # ReadFromSpanner transform is called using project_id, instance_id, 
database_id and a query
+  # A table with a list of columns can also be specified instead of a query
+    - type: ReadFromSpanner
+      name: ReadShipments
+      config:
+        project_id: 'apache-beam-testing'
+        instance_id: 'shipment-test'
+        database_id: 'shipment'
+        query: 'SELECT * FROM shipments'
+
+  # Filtering the data based on a specific condition
+  # Here, the condition is used to keep only the rows where the customer_id is 
'C1'
+    - type: Filter
+      name: FilterShipments
+      input: ReadShipments
+      config:
+        language: python
+        keep: "customer_id == 'C1'"
+
+  # Mapping the data fields and applying transformations
+  # A new field 'shipment_cost_category' is added with a custom transformation
+  # A callable is defined to categorize shipment cost
+    - type: MapToFields
+      name: MapFieldsForSpanner
+      input: FilterShipments
+      config:
+        language: python
+        fields:
+          shipment_id: shipment_id
+          customer_id: customer_id
+          shipment_date: shipment_date
+          shipment_cost: shipment_cost
+          customer_name: customer_name
+          customer_email: customer_email
+          shipment_cost_category:
+            callable: |
+              def categorize_cost(row):
+                  cost = float(row[3])
+                  if cost < 50:
+                      return 'Low Cost'
+                  elif cost < 200:
+                      return 'Medium Cost'
+                  else:
+                      return 'High Cost'
+                      
+  # Writing the transformed data to a CSV file          
+    - type: WriteToCsv
+      name: WriteBig
+      input: MapFieldsForSpanner
+      config:
+        path: shipments.csv
+
+
+  # On executing the above pipeline, a new CSV file is created with the 
following records
+
+# Expected:
+#  Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', 
shipment_cost=150.0, customer_name='Alice', customer_email='al...@example.com', 
shipment_cost_category='Medium Cost')
+#  Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', 
shipment_cost=20.0, customer_name='Alice', customer_email='al...@example.com', 
shipment_cost_category='Low Cost')
diff --git a/sdks/python/apache_beam/yaml/examples/io/spanner_write.yaml 
b/sdks/python/apache_beam/yaml/examples/io/spanner_write.yaml
new file mode 100644
index 00000000000..74ac35de260
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/examples/io/spanner_write.yaml
@@ -0,0 +1,53 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pipeline:
+  transforms:
+
+  # Step 1: Creating rows to be written to Spanner
+  # The element names correspond to the column names in the Spanner table
+    - type: Create
+      name: CreateRows
+      config:
+        elements:
+          - shipment_id: "S5"
+            customer_id: "C5"
+            shipment_date: "2023-05-09"
+            shipment_cost: 300.0
+            customer_name: "Erin"
+            customer_email: "e...@example.com"
+
+  # Step 2: Writing the created rows to a Spanner database
+  # We require the project ID, instance ID, database ID and table ID to 
connect to Spanner
+  # Error handling can be specified optionally to ensure any failed operations 
aren't lost
+  # The failed data is passed on in the pipeline and can be handled 
+    - type: WriteToSpanner
+      name: WriteSpanner
+      input: CreateRows
+      config:
+        project_id: 'apache-beam-testing'
+        instance_id: 'shipment-test'
+        database_id: 'shipment'
+        table_id: 'shipments'
+        error_handling:
+          output: my_error_output
+
+  # Step 3: Writing the failed records to a JSON file
+    - type: WriteToJson
+      input: WriteSpanner.my_error_output
+      config:
+        path: errors.json

Reply via email to