This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 108225d3797 Add Iceberg CDC support to YAML (#36641)
108225d3797 is described below

commit 108225d379772cce95cd0935e83377b5d9e6dffd
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Wed Nov 19 12:57:25 2025 -0800

    Add Iceberg CDC support to YAML (#36641)
    
    * Add Iceberg CDC support to YAML and Blueprints
    
    * Fix Lint
    
    * Add Filters to integration test
    
    * Fix Mock Tests
    
    * Remove Iceberg Blueprints from Beam Repo
    
    * Remove mock tests
    
    * Adding timestamps
    
    * Add Streaming test
---
 .../yaml/extended_tests/databases/iceberg.yaml     | 54 +++++++++++++++++++++-
 sdks/python/apache_beam/yaml/standard_io.yaml      | 28 +++++++++++
 2 files changed, 81 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml 
b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
index d72688774da..d7449233aab 100644
--- a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
+++ b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml
@@ -60,4 +60,56 @@ pipelines:
               - {label: "389a", rank: 2}
     options:
       project: "apache-beam-testing"
-      temp_location: "{TEMP_DIR}"
\ No newline at end of file
+      temp_location: "{TEMP_DIR}"
+
+  - name: read_cdc_batch
+    pipeline:
+      type: chain
+      transforms:
+        - type: ReadFromIcebergCDC
+          config:
+            table: db.labels
+            catalog_name: hadoop_catalog
+            catalog_properties:
+              type: hadoop
+              warehouse: "{TEMP_DIR}"
+            from_timestamp: 1762819200000
+            to_timestamp:  2078352000000
+            filter: '"label" = ''11a'' or "rank" = 1'
+            keep:
+              - label
+              - rank
+        - type: AssertEqual
+          config:
+            elements:
+              - {label: "11a", rank: 0}
+              - {label: "37a", rank: 1}
+    options:
+      project: "apache-beam-testing"
+      temp_location: "{TEMP_DIR}"
+
+  - name: read_cdc_streaming
+    pipeline:
+      type: chain
+      transforms:
+        - type: ReadFromIcebergCDC
+          config:
+            table: db.labels
+            catalog_name: hadoop_catalog
+            catalog_properties:
+              type: hadoop
+              warehouse: "{TEMP_DIR}"
+            streaming: True
+            to_timestamp: 2078352000000
+            filter: '"label" = ''11a'' or "rank" = 1'
+            keep:
+              - label
+              - rank
+        - type: AssertEqual
+          config:
+            elements:
+              - {label: "11a", rank: 0}
+              - {label: "37a", rank: 1}
+    options:
+      project: "apache-beam-testing"
+      temp_location: "{TEMP_DIR}"
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml 
b/sdks/python/apache_beam/yaml/standard_io.yaml
index 66f0c124b4c..458d3d63e43 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -403,3 +403,31 @@
         'WriteToBigTable': 
'beam:schematransform:org.apache.beam:bigtable_write:v1'
       config:
         gradle_target: 
'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
+
+#IcebergCDC
+- type: renaming
+  transforms:
+    'ReadFromIcebergCDC': 'ReadFromIcebergCDC'
+  config:
+    mappings:
+      'ReadFromIcebergCDC':
+        table: 'table'
+        catalog_name: 'catalog_name'
+        catalog_properties: 'catalog_properties'
+        config_properties: 'config_properties'
+        drop: 'drop'
+        filter: 'filter'
+        from_snapshot: 'from_snapshot'
+        from_timestamp: 'from_timestamp'
+        keep: 'keep'
+        poll_interval_seconds: 'poll_interval_seconds'
+        starting_strategy: 'starting_strategy'
+        streaming: 'streaming'
+        to_snapshot: 'to_snapshot'
+        to_timestamp: 'to_timestamp'
+    underlying_provider:
+      type: beamJar
+      transforms:
+        'ReadFromIcebergCDC': 
'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1'
+      config:
+        gradle_target: 'sdks:java:io:expansion-service:shadowJar'

Reply via email to