This is an automated email from the ASF dual-hosted git repository.

ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2375fed9735 Refresh Iceberg partition specs periodically (#38408)
2375fed9735 is described below

commit 2375fed973598a2fd666706099bbdc1f0a3be28f
Author: Atharv <[email protected]>
AuthorDate: Wed May 20 23:36:26 2026 +0530

    Refresh Iceberg partition specs periodically (#38408)
---
 .../iceberg/AssignDestinationsAndPartitions.java   | 47 ++++++++++++++++++++--
 1 file changed, 44 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java
index 475786d3a4f..e5d70d85d87 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 
 /**
@@ -51,8 +52,10 @@ class AssignDestinationsAndPartitions
 
   private final DynamicDestinations dynamicDestinations;
   private final IcebergCatalogConfig catalogConfig;
+
   static final String DESTINATION = "destination";
   static final String PARTITION = "partition";
+
   static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA =
       org.apache.beam.sdk.schemas.Schema.builder()
           .addStringField(DESTINATION)
@@ -75,8 +78,13 @@ class AssignDestinationsAndPartitions
   }
 
   static class AssignDoFn extends DoFn<Row, KV<Row, Row>> {
+
+    private static final Duration REFRESH_INTERVAL = 
Duration.standardMinutes(5);
+
     private transient @MonotonicNonNull Map<String, PartitionKey> 
partitionKeys;
     private transient @MonotonicNonNull Map<String, BeamRowWrapper> wrappers;
+    private transient @MonotonicNonNull Map<String, Instant> lastRefreshTimes;
+
     private final DynamicDestinations dynamicDestinations;
     private final IcebergCatalogConfig catalogConfig;
 
@@ -89,6 +97,7 @@ class AssignDestinationsAndPartitions
     public void setup() {
       this.wrappers = new HashMap<>();
       this.partitionKeys = new HashMap<>();
+      this.lastRefreshTimes = new HashMap<>();
     }
 
     @ProcessElement
@@ -98,42 +107,74 @@ class AssignDestinationsAndPartitions
         PaneInfo paneInfo,
         @Timestamp Instant timestamp,
         OutputReceiver<KV<Row, Row>> out) {
+
       String tableIdentifier =
           dynamicDestinations.getTableStringIdentifier(
               ValueInSingleWindow.of(element, timestamp, window, paneInfo));
+
       Row data = dynamicDestinations.getData(element);
 
       @Nullable PartitionKey partitionKey = 
checkStateNotNull(partitionKeys).get(tableIdentifier);
+
       @Nullable BeamRowWrapper wrapper = 
checkStateNotNull(wrappers).get(tableIdentifier);
-      if (partitionKey == null || wrapper == null) {
+
+      @Nullable Instant lastRefresh = 
checkStateNotNull(lastRefreshTimes).get(tableIdentifier);
+
+      Instant now = Instant.now();
+
+      boolean shouldRefresh =
+          partitionKey == null
+              || wrapper == null
+              || lastRefresh == null
+              || now.isAfter(lastRefresh.plus(REFRESH_INTERVAL));
+
+      if (shouldRefresh) {
+
         PartitionSpec spec = PartitionSpec.unpartitioned();
+
         Schema schema = 
IcebergUtils.beamSchemaToIcebergSchema(data.getSchema());
+
         @Nullable
         IcebergTableCreateConfig createConfig =
             
dynamicDestinations.instantiateDestination(tableIdentifier).getTableCreateConfig();
+
         if (createConfig != null && createConfig.getPartitionFields() != null) 
{
+
           spec =
               
PartitionUtils.toPartitionSpec(createConfig.getPartitionFields(), 
data.getSchema());
+
         } else {
+
           try {
             // see if table already exists with a spec
-            // TODO(https://github.com/apache/beam/issues/38337): improve this 
by periodically
-            // refreshing the table to fetch updated specs
             spec = 
catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec();
+
           } catch (NoSuchTableException ignored) {
             // no partition to apply
           }
         }
+
         partitionKey = new PartitionKey(spec, schema);
+
         wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct());
+
         checkStateNotNull(partitionKeys).put(tableIdentifier, partitionKey);
+
         checkStateNotNull(wrappers).put(tableIdentifier, wrapper);
+
+        checkStateNotNull(lastRefreshTimes).put(tableIdentifier, now);
       }
+
+      partitionKey = checkStateNotNull(partitionKey);
+      wrapper = checkStateNotNull(wrapper);
+
       partitionKey.partition(wrapper.wrap(data));
+
       String partitionPath = partitionKey.toPath();
 
       Row destAndPartition =
           Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, 
partitionPath).build();
+
       out.output(KV.of(destAndPartition, data));
     }
   }

Reply via email to