This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new 1bd3ce8665 Core: Fix for respecting custom location providers in
SerializableTable #12564 (#14280) (#14532)
1bd3ce8665 is described below
commit 1bd3ce8665678dbf5fe2e71819373e15a79014c6
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Nov 7 15:30:57 2025 -0800
Core: Fix for respecting custom location providers in SerializableTable
#12564 (#14280) (#14532)
(cherry picked from commit e667f64f5bddbacb1a641ac8ea67fc21a76e434d)
Co-authored-by: przemekd <[email protected]>
---
.../java/org/apache/iceberg/SerializableTable.java | 20 +++----
core/src/main/java/org/apache/iceberg/Try.java | 68 ++++++++++++++++++++++
.../iceberg/hadoop/TestTableSerialization.java | 50 ++++++++++++++++
.../org/apache/iceberg/TestTableSerialization.java | 43 ++++++++++++++
.../org/apache/iceberg/TestTableSerialization.java | 43 ++++++++++++++
.../org/apache/iceberg/TestTableSerialization.java | 43 ++++++++++++++
6 files changed, 254 insertions(+), 13 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java
b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index 57e2e1b730..dce7697319 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -41,10 +41,10 @@ import org.apache.iceberg.util.SerializableSupplier;
* table metadata, it directly persists the current schema, spec, sort order,
table properties to
* avoid reading the metadata file from other nodes for frequently needed
metadata.
*
- * <p>The implementation assumes the passed instances of {@link FileIO},
{@link EncryptionManager}
- * are serializable. If you are serializing the table using a custom
serialization framework like
- * Kryo, those instances of {@link FileIO}, {@link EncryptionManager} must be
supported by that
- * particular serialization framework.
+ * <p>The implementation assumes the passed instances of {@link FileIO},
{@link EncryptionManager},
+ * {@link LocationProvider} are serializable. If you are serializing the table
using a custom
+ * serialization framework like Kryo, those instances of {@link FileIO},
{@link EncryptionManager},
+ * {@link LocationProvider} must be supported by that particular serialization
framework.
*
* <p><em>Note:</em> loading the complete metadata from a large number of
nodes can overwhelm the
* storage.
@@ -65,8 +65,8 @@ public class SerializableTable implements Table,
HasTableOperations, Serializabl
private final Map<String, SnapshotRef> refs;
private final UUID uuid;
private final int formatVersion;
+ private final Try<LocationProvider> locationProviderTry;
- private transient volatile LocationProvider lazyLocationProvider = null;
private transient volatile Table lazyTable = null;
private transient volatile Schema lazySchema = null;
private transient volatile Map<Integer, PartitionSpec> lazySpecs = null;
@@ -85,6 +85,7 @@ public class SerializableTable implements Table,
HasTableOperations, Serializabl
this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
this.io = fileIO(table);
this.encryption = table.encryption();
+ this.locationProviderTry = Try.of(table::locationProvider);
this.refs = SerializableMap.copyOf(table.refs());
this.uuid = table.uuid();
this.formatVersion = formatVersion(table);
@@ -265,14 +266,7 @@ public class SerializableTable implements Table,
HasTableOperations, Serializabl
@Override
public LocationProvider locationProvider() {
- if (lazyLocationProvider == null) {
- synchronized (this) {
- if (lazyLocationProvider == null) {
- this.lazyLocationProvider = LocationProviders.locationsFor(location,
properties);
- }
- }
- }
- return lazyLocationProvider;
+ return this.locationProviderTry.getOrThrow();
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/Try.java
b/core/src/main/java/org/apache/iceberg/Try.java
new file mode 100644
index 0000000000..d47378b85a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/Try.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import org.apache.iceberg.util.SerializableSupplier;
+
+/**
+ * Container for the result of an operation that might throw an exception.
+ *
+ * @param <T> the type of the result
+ */
+class Try<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final T value;
+ private final Throwable throwable;
+
+ private Try(T value, Throwable throwable) {
+ this.value = value;
+ this.throwable = throwable;
+ }
+
+ /**
+ * Executes the given operation and returns a Try object containing either
the result or the
+ * exception.
+ *
+ * @param supplier the operation to execute
+ * @param <T> the type of the result
+ * @return a Try object containing either the result or the exception
+ */
+ static <T> Try<T> of(SerializableSupplier<T> supplier) {
+ try {
+ return new Try<>(supplier.get(), null);
+ } catch (Throwable t) {
+ return new Try<>(null, t);
+ }
+ }
+
+ /** Returns the value if present or throws the original exception if the
operation failed. */
+ T getOrThrow() {
+ if (throwable != null) {
+ sneakyThrow(throwable);
+ }
+ return value;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <E extends Throwable> void sneakyThrow(Throwable throwable)
throws E {
+ throw (E) throwable;
+ }
+}
diff --git
a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
index 797f5797ef..5103e2e9be 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
@@ -20,6 +20,10 @@ package org.apache.iceberg.hadoop;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -199,6 +203,52 @@ public class TestTableSerialization extends
HadoopTableTestBase {
((HasTableOperations)
table).operations().current().metadataFileLocation() + "#" + type);
}
+ @Test
+ public void testLocationProviderExceptionIsDeferred() {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ // SerializableTable.copyOf should not throw an exception even if
locationProvider fails
+ Table serializableTable = SerializableTable.copyOf(spyTable);
+ assertThat(serializableTable).isNotNull();
+
+ // The exception should be thrown when locationProvider() is actually
called
+ assertThatThrownBy(serializableTable::locationProvider).isSameAs(failure);
+
+ // Verify that the original table's locationProvider was called during
construction
+ verify(spyTable, times(1)).locationProvider();
+ }
+
+ @Test
+ public void testLocationProviderExceptionJavaSerialization()
+ throws IOException, ClassNotFoundException {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTable.copyOf(spyTable);
+ Table deserialized = TestHelpers.roundTripSerialize(serializableTable);
+
+ assertThatThrownBy(deserialized::locationProvider)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("location provider failure");
+ }
+
+ @Test
+ public void testLocationProviderExceptionKryoSerialization() throws
IOException {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTable.copyOf(spyTable);
+ Table deserialized =
TestHelpers.KryoHelpers.roundTripSerialize(serializableTable);
+
+ assertThatThrownBy(deserialized::locationProvider)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("location provider failure");
+ }
+
private static Set<CharSequence> getFiles(Table table) throws IOException {
Set<CharSequence> files = Sets.newHashSet();
if (table instanceof PositionDeletesTable
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
index fd6dfd07b5..6e510299c6 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -158,6 +159,48 @@ public class TestTableSerialization {
txnTable, KryoHelpers.roundTripSerialize(serializableTxnTable));
}
+ @TestTemplate
+ public void testLocationProviderExceptionIsDeferred() {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTableWithSize.copyOf(spyTable);
+ assertThat(serializableTable).isNotNull();
+
+ assertThatThrownBy(serializableTable::locationProvider).isSameAs(failure);
+ verify(spyTable, times(1)).locationProvider();
+ }
+
+ @TestTemplate
+ public void testLocationProviderExceptionJavaSerialization()
+ throws IOException, ClassNotFoundException {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTableWithSize.copyOf(spyTable);
+ Table deserialized = TestHelpers.roundTripSerialize(serializableTable);
+
+ assertThatThrownBy(deserialized::locationProvider)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("location provider failure");
+ }
+
+ @TestTemplate
+ public void testLocationProviderExceptionKryoSerialization() throws
IOException {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTableWithSize.copyOf(spyTable);
+ Table deserialized = KryoHelpers.roundTripSerialize(serializableTable);
+
+ assertThatThrownBy(deserialized::locationProvider)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("location provider failure");
+ }
+
private List<Table> tables() {
List<Table> tables = Lists.newArrayList();
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
index fd6dfd07b5..6e510299c6 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -158,6 +159,48 @@ public class TestTableSerialization {
txnTable, KryoHelpers.roundTripSerialize(serializableTxnTable));
}
+ @TestTemplate
+ public void testLocationProviderExceptionIsDeferred() {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTableWithSize.copyOf(spyTable);
+ assertThat(serializableTable).isNotNull();
+
+ assertThatThrownBy(serializableTable::locationProvider).isSameAs(failure);
+ verify(spyTable, times(1)).locationProvider();
+ }
+
+ @TestTemplate
+ public void testLocationProviderExceptionJavaSerialization()
+ throws IOException, ClassNotFoundException {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTableWithSize.copyOf(spyTable);
+ Table deserialized = TestHelpers.roundTripSerialize(serializableTable);
+
+ assertThatThrownBy(deserialized::locationProvider)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("location provider failure");
+ }
+
+ @TestTemplate
+ public void testLocationProviderExceptionKryoSerialization() throws
IOException {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTableWithSize.copyOf(spyTable);
+ Table deserialized = KryoHelpers.roundTripSerialize(serializableTable);
+
+ assertThatThrownBy(deserialized::locationProvider)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("location provider failure");
+ }
+
private List<Table> tables() {
List<Table> tables = Lists.newArrayList();
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
index fd6dfd07b5..6e510299c6 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -158,6 +159,48 @@ public class TestTableSerialization {
txnTable, KryoHelpers.roundTripSerialize(serializableTxnTable));
}
+ @TestTemplate
+ public void testLocationProviderExceptionIsDeferred() {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTableWithSize.copyOf(spyTable);
+ assertThat(serializableTable).isNotNull();
+
+ assertThatThrownBy(serializableTable::locationProvider).isSameAs(failure);
+ verify(spyTable, times(1)).locationProvider();
+ }
+
+ @TestTemplate
+ public void testLocationProviderExceptionJavaSerialization()
+ throws IOException, ClassNotFoundException {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTableWithSize.copyOf(spyTable);
+ Table deserialized = TestHelpers.roundTripSerialize(serializableTable);
+
+ assertThatThrownBy(deserialized::locationProvider)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("location provider failure");
+ }
+
+ @TestTemplate
+ public void testLocationProviderExceptionKryoSerialization() throws
IOException {
+ Table spyTable = spy(table);
+ RuntimeException failure = new RuntimeException("location provider
failure");
+ when(spyTable.locationProvider()).thenThrow(failure);
+
+ Table serializableTable = SerializableTableWithSize.copyOf(spyTable);
+ Table deserialized = KryoHelpers.roundTripSerialize(serializableTable);
+
+ assertThatThrownBy(deserialized::locationProvider)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("location provider failure");
+ }
+
private List<Table> tables() {
List<Table> tables = Lists.newArrayList();