This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 56607ec888b Improve Iceberg Blueprint and documentation and Tests 
(#35961)
56607ec888b is described below

commit 56607ec888b35907669b53594120e50a152348cd
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Mon Sep 8 09:20:42 2025 -0700

    Improve Iceberg Blueprint and documentation and Tests (#35961)
    
    * Improve Iceberg pubsub Blueprint and documentation
    
    * Fix tests
    
    * fix readme
    
    * Add Iceberg extended test
---
 sdks/python/apache_beam/yaml/examples/README.md    |  7 ++-
 .../transforms/blueprint/pubsub_to_iceberg.yaml    |  6 +--
 .../yaml/extended_tests/databases/iceberg.yaml     | 63 ++++++++++++++++++++++
 3 files changed, 71 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/examples/README.md 
b/sdks/python/apache_beam/yaml/examples/README.md
index 25765e5d48b..55fd19bd8c4 100644
--- a/sdks/python/apache_beam/yaml/examples/README.md
+++ b/sdks/python/apache_beam/yaml/examples/README.md
@@ -35,7 +35,8 @@
 
 ## Prerequistes
 
-Build this jar for running with the run command in the next stage:
+Build the expansion service jar required for your YAML code.
+IO mapping is available in standard_io.yaml, so use this example run command:
 
 ```
 cd <PATH_TO_BEAM_REPO>/beam; ./gradlew 
sdks:java:io:google-cloud-platform:expansion-service:shadowJar
@@ -163,7 +164,9 @@ python -m apache_beam.yaml.main \
   --num_workers $NUM_WORKERS \
   --job_name $JOB_NAME \
   --jinja_variables '{ "BOOTSTRAP_SERVERS": "123.45.67.89:9092",
-    "TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'
+    "TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'\
+  --sdk_location container  \
+  --sdk_harness_container_image_overrides 
".*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java11_sdk:latest"
 ```
 
 **_Optional_**: If Kafka cluster is set up with no SASL/PLAINTEXT 
authentication
diff --git 
a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/pubsub_to_iceberg.yaml
 
b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/pubsub_to_iceberg.yaml
index 95be4c29a6d..5292d4e0857 100644
--- 
a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/pubsub_to_iceberg.yaml
+++ 
b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/pubsub_to_iceberg.yaml
@@ -16,7 +16,7 @@
 # limitations under the License.
 #
 
-# A pipeline that both writes to and reads from the same Kafka topic.
+# A pipeline that reads from pubsub topic and writes to Iceberg table.
 
 pipeline:
   type: chain
@@ -27,7 +27,7 @@ pipeline:
       config:
         topic: "projects/apache-beam-testing/topics/my-topic"
         format: JSON
-        schema: 
+        schema:
           type: object
           properties:
             data: {type: BYTES}
@@ -39,6 +39,7 @@ pipeline:
         # Dynamic destinations
         table: "db.users.{zip}"
         catalog_name: "hadoop_catalog"
+        triggering_frequency_seconds: "20"
         catalog_properties:
           type: "hadoop"
           warehouse: "gs://MY-WAREHOUSE"
@@ -56,4 +57,3 @@ options:
 #  Row(label='37b', rank=4)
 #  Row(label='37c', rank=3)
 #  Row(label='37d', rank=2)
-
diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml 
b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
new file mode 100644
index 00000000000..d72688774da
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+  - name: TEMP_DIR
+    type: "tempfile.TemporaryDirectory"
+
+pipelines:
+  - name: write
+    pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {label: "11a", rank: 0}
+              - {label: "37a", rank: 1}
+              - {label: "389a", rank: 2}
+        - type: WriteToIceberg
+          config:
+            table: db.labels
+            catalog_name: hadoop_catalog
+            catalog_properties:
+              type: hadoop
+              warehouse: "{TEMP_DIR}"
+    options:
+      project: "apache-beam-testing"
+      temp_location: "{TEMP_DIR}"
+
+  - name: read
+    pipeline:
+      type: chain
+      transforms:
+        - type: ReadFromIceberg
+          config:
+            table: db.labels
+            catalog_name: hadoop_catalog
+            catalog_properties:
+              type: hadoop
+              warehouse: "{TEMP_DIR}"
+        - type: AssertEqual
+          config:
+            elements:
+              - {label: "11a", rank: 0}
+              - {label: "37a", rank: 1}
+              - {label: "389a", rank: 2}
+    options:
+      project: "apache-beam-testing"
+      temp_location: "{TEMP_DIR}"
\ No newline at end of file

Reply via email to