This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 108225d3797 Add Iceberg CDC support to YAML (#36641)
108225d3797 is described below
commit 108225d379772cce95cd0935e83377b5d9e6dffd
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Wed Nov 19 12:57:25 2025 -0800
Add Iceberg CDC support to YAML (#36641)
* Add Iceberg CDC support to YAML and Blueprints
* Fix Lint
* Add Filters to integration test
* Fix Mock Tests
* Remove Iceberg Blueprints from Beam Repo
* Remove mock tests
* Adding timestamps
* Add Streaming test
---
.../yaml/extended_tests/databases/iceberg.yaml | 54 +++++++++++++++++++++-
sdks/python/apache_beam/yaml/standard_io.yaml | 28 +++++++++++
2 files changed, 81 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
index d72688774da..d7449233aab 100644
--- a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
+++ b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
@@ -60,4 +60,56 @@ pipelines:
- {label: "389a", rank: 2}
options:
project: "apache-beam-testing"
- temp_location: "{TEMP_DIR}"
\ No newline at end of file
+ temp_location: "{TEMP_DIR}"
+
+ - name: read_cdc_batch
+ pipeline:
+ type: chain
+ transforms:
+ - type: ReadFromIcebergCDC
+ config:
+ table: db.labels
+ catalog_name: hadoop_catalog
+ catalog_properties:
+ type: hadoop
+ warehouse: "{TEMP_DIR}"
+ from_timestamp: 1762819200000
+ to_timestamp: 2078352000000
+ filter: '"label" = ''11a'' or "rank" = 1'
+ keep:
+ - label
+ - rank
+ - type: AssertEqual
+ config:
+ elements:
+ - {label: "11a", rank: 0}
+ - {label: "37a", rank: 1}
+ options:
+ project: "apache-beam-testing"
+ temp_location: "{TEMP_DIR}"
+
+ - name: read_cdc_streaming
+ pipeline:
+ type: chain
+ transforms:
+ - type: ReadFromIcebergCDC
+ config:
+ table: db.labels
+ catalog_name: hadoop_catalog
+ catalog_properties:
+ type: hadoop
+ warehouse: "{TEMP_DIR}"
+ streaming: True
+ to_timestamp: 2078352000000
+ filter: '"label" = ''11a'' or "rank" = 1'
+ keep:
+ - label
+ - rank
+ - type: AssertEqual
+ config:
+ elements:
+ - {label: "11a", rank: 0}
+ - {label: "37a", rank: 1}
+ options:
+ project: "apache-beam-testing"
+ temp_location: "{TEMP_DIR}"
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml
b/sdks/python/apache_beam/yaml/standard_io.yaml
index 66f0c124b4c..458d3d63e43 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -403,3 +403,31 @@
'WriteToBigTable':
'beam:schematransform:org.apache.beam:bigtable_write:v1'
config:
gradle_target:
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
+
+#IcebergCDC
+- type: renaming
+ transforms:
+ 'ReadFromIcebergCDC': 'ReadFromIcebergCDC'
+ config:
+ mappings:
+ 'ReadFromIcebergCDC':
+ table: 'table'
+ catalog_name: 'catalog_name'
+ catalog_properties: 'catalog_properties'
+ config_properties: 'config_properties'
+ drop: 'drop'
+ filter: 'filter'
+ from_snapshot: 'from_snapshot'
+ from_timestamp: 'from_timestamp'
+ keep: 'keep'
+ poll_interval_seconds: 'poll_interval_seconds'
+ starting_strategy: 'starting_strategy'
+ streaming: 'streaming'
+ to_snapshot: 'to_snapshot'
+ to_timestamp: 'to_timestamp'
+ underlying_provider:
+ type: beamJar
+ transforms:
+ 'ReadFromIcebergCDC':
'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1'
+ config:
+ gradle_target: 'sdks:java:io:expansion-service:shadowJar'