This is an automated email from the ASF dual-hosted git repository.
ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2375fed9735 Refresh Iceberg partition specs periodically (#38408)
2375fed9735 is described below
commit 2375fed973598a2fd666706099bbdc1f0a3be28f
Author: Atharv <[email protected]>
AuthorDate: Wed May 20 23:36:26 2026 +0530
Refresh Iceberg partition specs periodically (#38408)
---
.../iceberg/AssignDestinationsAndPartitions.java | 47 ++++++++++++++++++++--
1 file changed, 44 insertions(+), 3 deletions(-)
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java
index 475786d3a4f..e5d70d85d87 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
import org.joda.time.Instant;
/**
@@ -51,8 +52,10 @@ class AssignDestinationsAndPartitions
private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
+
static final String DESTINATION = "destination";
static final String PARTITION = "partition";
+
static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA =
org.apache.beam.sdk.schemas.Schema.builder()
.addStringField(DESTINATION)
@@ -75,8 +78,13 @@ class AssignDestinationsAndPartitions
}
static class AssignDoFn extends DoFn<Row, KV<Row, Row>> {
+
+ private static final Duration REFRESH_INTERVAL =
Duration.standardMinutes(5);
+
private transient @MonotonicNonNull Map<String, PartitionKey>
partitionKeys;
private transient @MonotonicNonNull Map<String, BeamRowWrapper> wrappers;
+ private transient @MonotonicNonNull Map<String, Instant> lastRefreshTimes;
+
private final DynamicDestinations dynamicDestinations;
private final IcebergCatalogConfig catalogConfig;
@@ -89,6 +97,7 @@ class AssignDestinationsAndPartitions
public void setup() {
this.wrappers = new HashMap<>();
this.partitionKeys = new HashMap<>();
+ this.lastRefreshTimes = new HashMap<>();
}
@ProcessElement
@@ -98,42 +107,74 @@ class AssignDestinationsAndPartitions
PaneInfo paneInfo,
@Timestamp Instant timestamp,
OutputReceiver<KV<Row, Row>> out) {
+
String tableIdentifier =
dynamicDestinations.getTableStringIdentifier(
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
+
Row data = dynamicDestinations.getData(element);
@Nullable PartitionKey partitionKey =
checkStateNotNull(partitionKeys).get(tableIdentifier);
+
@Nullable BeamRowWrapper wrapper =
checkStateNotNull(wrappers).get(tableIdentifier);
- if (partitionKey == null || wrapper == null) {
+
+ @Nullable Instant lastRefresh =
checkStateNotNull(lastRefreshTimes).get(tableIdentifier);
+
+ Instant now = Instant.now();
+
+ boolean shouldRefresh =
+ partitionKey == null
+ || wrapper == null
+ || lastRefresh == null
+ || now.isAfter(lastRefresh.plus(REFRESH_INTERVAL));
+
+ if (shouldRefresh) {
+
PartitionSpec spec = PartitionSpec.unpartitioned();
+
Schema schema =
IcebergUtils.beamSchemaToIcebergSchema(data.getSchema());
+
@Nullable
IcebergTableCreateConfig createConfig =
dynamicDestinations.instantiateDestination(tableIdentifier).getTableCreateConfig();
+
if (createConfig != null && createConfig.getPartitionFields() != null)
{
+
spec =
PartitionUtils.toPartitionSpec(createConfig.getPartitionFields(),
data.getSchema());
+
} else {
+
try {
// see if table already exists with a spec
- // TODO(https://github.com/apache/beam/issues/38337): improve this
by periodically
- // refreshing the table to fetch updated specs
spec =
catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec();
+
} catch (NoSuchTableException ignored) {
// no partition to apply
}
}
+
partitionKey = new PartitionKey(spec, schema);
+
wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct());
+
checkStateNotNull(partitionKeys).put(tableIdentifier, partitionKey);
+
checkStateNotNull(wrappers).put(tableIdentifier, wrapper);
+
+ checkStateNotNull(lastRefreshTimes).put(tableIdentifier, now);
}
+
+ partitionKey = checkStateNotNull(partitionKey);
+ wrapper = checkStateNotNull(wrapper);
+
partitionKey.partition(wrapper.wrap(data));
+
String partitionPath = partitionKey.toPath();
Row destAndPartition =
Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier,
partitionPath).build();
+
out.output(KV.of(destAndPartition, data));
}
}