This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 38cc881366 BigQuery: Eliminate redundant table load by using ETag for
conflict detection (#14940)
38cc881366 is described below
commit 38cc88136684a57b61be4ae0d2c1886eff742a28
Author: Joy Haldar <[email protected]>
AuthorDate: Fri Jan 16 04:22:53 2026 +0530
BigQuery: Eliminate redundant table load by using ETag for conflict
detection (#14940)
* BigQuery: Reuse table from doRefresh() in updateTable() to reduce API
calls
---
.../gcp/bigquery/BigQueryTableOperations.java | 43 ++++++++++------------
.../gcp/bigquery/FakeBigQueryMetastoreClient.java | 13 ++-----
.../gcp/bigquery/TestBigQueryTableOperations.java | 21 +++++------
3 files changed, 32 insertions(+), 45 deletions(-)
diff --git
a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java
b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java
index e5f0a44957..37728aa157 100644
---
a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java
+++
b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java
@@ -33,6 +33,7 @@ import
org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,9 @@ final class BigQueryTableOperations extends
BaseMetastoreTableOperations {
private final FileIO fileIO;
private final TableReference tableReference;
+ /** Table loaded in doRefresh() for reuse in updateTable() to avoid
redundant API call. */
+ private volatile Table metastoreTable;
+
BigQueryTableOperations(
BigQueryMetastoreClient client, FileIO fileIO, TableReference
tableReference) {
this.client = client;
@@ -60,9 +64,11 @@ final class BigQueryTableOperations extends
BaseMetastoreTableOperations {
public void doRefresh() {
// Must default to null.
String metadataLocation = null;
+ this.metastoreTable = null;
try {
+ this.metastoreTable = client.load(tableReference);
metadataLocation =
-
loadMetadataLocationOrThrow(client.load(tableReference).getExternalCatalogTableOptions());
+
loadMetadataLocationOrThrow(metastoreTable.getExternalCatalogTableOptions());
} catch (NoSuchTableException e) {
if (currentMetadataLocation() != null) {
// Re-throws the exception because the table must exist in this case.
@@ -86,7 +92,7 @@ final class BigQueryTableOperations extends
BaseMetastoreTableOperations {
if (base == null) {
createTable(newMetadataLocation, metadata);
} else {
- updateTable(base.metadataFileLocation(), newMetadataLocation,
metadata);
+ updateTable(newMetadataLocation, metadata);
}
commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS;
} catch (CommitFailedException | CommitStateUnknownException e) {
@@ -149,35 +155,24 @@ final class BigQueryTableOperations extends
BaseMetastoreTableOperations {
}
/** Update table properties with concurrent update detection using etag. */
- private void updateTable(
- String oldMetadataLocation, String newMetadataLocation, TableMetadata
metadata) {
- Table table = client.load(tableReference);
- if (table.getEtag().isEmpty()) {
+ private void updateTable(String newMetadataLocation, TableMetadata metadata)
{
+ Preconditions.checkState(
+ metastoreTable != null,
+ "Table %s must be loaded during refresh before commit",
+ tableName());
+
+ if (metastoreTable.getEtag().isEmpty()) {
throw new ValidationException(
"Etag of legacy table %s is empty, manually update the table via the
BigQuery API or"
+ " recreate and retry",
tableName());
}
- ExternalCatalogTableOptions options =
table.getExternalCatalogTableOptions();
- addConnectionIfProvided(table, metadata.properties());
-
- // If `metadataLocationFromMetastore` is different from metadata location
of base, it means
- // someone has updated metadata location in metastore, which is a conflict
update.
- String metadataLocationFromMetastore =
- options.getParameters().getOrDefault(METADATA_LOCATION_PROP, "");
- if (!metadataLocationFromMetastore.isEmpty()
- && !metadataLocationFromMetastore.equals(oldMetadataLocation)) {
- throw new CommitFailedException(
- "Cannot commit base metadata location '%s' is not same as the
current table metadata location '%s' for"
- + " %s.%s",
- oldMetadataLocation,
- metadataLocationFromMetastore,
- tableReference.getDatasetId(),
- tableReference.getTableId());
- }
+ ExternalCatalogTableOptions options =
metastoreTable.getExternalCatalogTableOptions();
+ addConnectionIfProvided(metastoreTable, metadata.properties());
options.setParameters(buildTableParameters(newMetadataLocation, metadata));
- client.update(tableReference, table);
+ client.update(tableReference, metastoreTable);
+ this.metastoreTable = null;
}
// To make the table queryable from Hive, the user would likely be setting
the HIVE_ENGINE_ENABLED
diff --git
a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/FakeBigQueryMetastoreClient.java
b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/FakeBigQueryMetastoreClient.java
index 0c6df15091..3619f7908c 100644
---
a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/FakeBigQueryMetastoreClient.java
+++
b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/FakeBigQueryMetastoreClient.java
@@ -180,18 +180,11 @@ public class FakeBigQueryMetastoreClient implements
BigQueryMetastoreClient {
String incomingEtag = table.getEtag();
String requiredEtag = existingTable.getEtag();
- // The real patch() uses an If-Match header which is passed separately,
- // NOT on the incoming table object.
- // The BigQueryTableOperations does NOT set the ETag on the Table object
- // it passes to the client update() method.
- // For a fake, we assume the ETag check needs to be simulated based on
- // state, BUT the real client.update() expects the ETAG as a separate
parameter
- // (or implicitly via setIfMatch header, which this Fake doesn't see).
- // To make the fake usable, we'll assume that if an ETag *is* present
- // on the incoming table object, it must match.
+ // Simulate ETag-based optimistic locking. If the incoming table has an
ETag,
+ // it must match the current ETag in the store.
if (incomingEtag != null && !incomingEtag.equals(requiredEtag)) {
throw new CommitFailedException(
- "Etag mismatch for table: %s. Required: %s, Found: %s",
+ "Cannot commit: Etag mismatch for table: %s. Required: %s, Found:
%s",
tableReference, requiredEtag, incomingEtag);
}
diff --git
a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryTableOperations.java
b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryTableOperations.java
index 4666ec61f4..c5095aedb0 100644
---
a/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryTableOperations.java
+++
b/bigquery/src/test/java/org/apache/iceberg/gcp/bigquery/TestBigQueryTableOperations.java
@@ -185,26 +185,25 @@ public class TestBigQueryTableOperations {
}
@Test
- public void failWhenMetadataLocationDiff() throws Exception {
+ public void failWhenConcurrentModificationDetected() throws Exception {
Table tableWithEtag = createTestTable().setEtag("etag");
- Table tableWithNewMetadata =
- new Table()
- .setEtag("etag")
- .setExternalCatalogTableOptions(
- new ExternalCatalogTableOptions()
- .setParameters(ImmutableMap.of(METADATA_LOCATION_PROP,
"a/new/location")));
reset(client);
- // Two invocations, for loadTable and commit.
- when(client.load(TABLE_REFERENCE)).thenReturn(tableWithEtag,
tableWithNewMetadata);
+ when(client.load(TABLE_REFERENCE)).thenReturn(tableWithEtag);
org.apache.iceberg.Table loadedTable = catalog.loadTable(IDENTIFIER);
- when(client.update(any(), any())).thenReturn(tableWithEtag);
+ // Simulate concurrent modification detected via ETag mismatch
+ when(client.update(any(), any()))
+ .thenThrow(new CommitFailedException("Cannot commit: Etag mismatch"));
+
assertThatThrownBy(
() -> loadedTable.updateSchema().addColumn("n",
Types.IntegerType.get()).commit())
.isInstanceOf(CommitFailedException.class)
- .hasMessageContaining("is not same as the current table metadata
location");
+ .hasMessageContaining("Cannot commit");
+
+ // Verify table is loaded only once
+ verify(client, times(1)).load(TABLE_REFERENCE);
}
@Test