This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 56607ec888b Improve Iceberg Blueprint and documentation and Tests
(#35961)
56607ec888b is described below
commit 56607ec888b35907669b53594120e50a152348cd
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Mon Sep 8 09:20:42 2025 -0700
Improve Iceberg Blueprint and documentation and Tests (#35961)
* Improve Iceberg pubsub Blueprint and documentation
* Fix tests
* fix readme
* Add Iceberg extended test
---
sdks/python/apache_beam/yaml/examples/README.md | 7 ++-
.../transforms/blueprint/pubsub_to_iceberg.yaml | 6 +--
.../yaml/extended_tests/databases/iceberg.yaml | 63 ++++++++++++++++++++++
3 files changed, 71 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/examples/README.md
b/sdks/python/apache_beam/yaml/examples/README.md
index 25765e5d48b..55fd19bd8c4 100644
--- a/sdks/python/apache_beam/yaml/examples/README.md
+++ b/sdks/python/apache_beam/yaml/examples/README.md
@@ -35,7 +35,8 @@
## Prerequistes
-Build this jar for running with the run command in the next stage:
+Build the expansion service jar required for your YAML code.
+IO mapping is available in standard_io.yaml, so use this example run command:
```
cd <PATH_TO_BEAM_REPO>/beam; ./gradlew
sdks:java:io:google-cloud-platform:expansion-service:shadowJar
@@ -163,7 +164,9 @@ python -m apache_beam.yaml.main \
--num_workers $NUM_WORKERS \
--job_name $JOB_NAME \
--jinja_variables '{ "BOOTSTRAP_SERVERS": "123.45.67.89:9092",
- "TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'
+ "TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'\
+ --sdk_location container \
+ --sdk_harness_container_image_overrides
".*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java11_sdk:latest"
```
**_Optional_**: If Kafka cluster is set up with no SASL/PLAINTEXT
authentication
diff --git
a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/pubsub_to_iceberg.yaml
b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/pubsub_to_iceberg.yaml
index 95be4c29a6d..5292d4e0857 100644
---
a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/pubsub_to_iceberg.yaml
+++
b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/pubsub_to_iceberg.yaml
@@ -16,7 +16,7 @@
# limitations under the License.
#
-# A pipeline that both writes to and reads from the same Kafka topic.
+# A pipeline that reads from pubsub topic and writes to Iceberg table.
pipeline:
type: chain
@@ -27,7 +27,7 @@ pipeline:
config:
topic: "projects/apache-beam-testing/topics/my-topic"
format: JSON
- schema:
+ schema:
type: object
properties:
data: {type: BYTES}
@@ -39,6 +39,7 @@ pipeline:
# Dynamic destinations
table: "db.users.{zip}"
catalog_name: "hadoop_catalog"
+ triggering_frequency_seconds: "20"
catalog_properties:
type: "hadoop"
warehouse: "gs://MY-WAREHOUSE"
@@ -56,4 +57,3 @@ options:
# Row(label='37b', rank=4)
# Row(label='37c', rank=3)
# Row(label='37d', rank=2)
-
diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
new file mode 100644
index 00000000000..d72688774da
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+ - name: TEMP_DIR
+ type: "tempfile.TemporaryDirectory"
+
+pipelines:
+ - name: write
+ pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {label: "11a", rank: 0}
+ - {label: "37a", rank: 1}
+ - {label: "389a", rank: 2}
+ - type: WriteToIceberg
+ config:
+ table: db.labels
+ catalog_name: hadoop_catalog
+ catalog_properties:
+ type: hadoop
+ warehouse: "{TEMP_DIR}"
+ options:
+ project: "apache-beam-testing"
+ temp_location: "{TEMP_DIR}"
+
+ - name: read
+ pipeline:
+ type: chain
+ transforms:
+ - type: ReadFromIceberg
+ config:
+ table: db.labels
+ catalog_name: hadoop_catalog
+ catalog_properties:
+ type: hadoop
+ warehouse: "{TEMP_DIR}"
+ - type: AssertEqual
+ config:
+ elements:
+ - {label: "11a", rank: 0}
+ - {label: "37a", rank: 1}
+ - {label: "389a", rank: 2}
+ options:
+ project: "apache-beam-testing"
+ temp_location: "{TEMP_DIR}"
\ No newline at end of file