This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new ba403009d2 Core: Move Hadoop conf serialization into 
SerializableConfiguration (#15583)
ba403009d2 is described below

commit ba403009d2636e1f422a865ebae083e77aa443ad
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Mar 13 15:40:14 2026 +0100

    Core: Move Hadoop conf serialization into SerializableConfiguration (#15583)
---
 .palantir/revapi.yml                               |  4 ++
 .../java/org/apache/iceberg/SerializableTable.java | 40 +---------------
 .../org/apache/iceberg/hadoop/HadoopFileIO.java    | 15 ++++--
 .../iceberg/hadoop/SerializableConfiguration.java  | 37 ++++++++-------
 .../org/apache/iceberg/io/ResolvingFileIO.java     |  2 +-
 .../org/apache/iceberg/util/SerializationUtil.java |  2 +-
 .../hadoop/TestSerializableConfiguration.java      | 55 ++++++++++++++++++++++
 7 files changed, 93 insertions(+), 62 deletions(-)

diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index 4ba3d8250d..dc5951256c 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -1370,6 +1370,10 @@ acceptedBreaks:
         new: "class org.apache.iceberg.encryption.EncryptingFileIO"
         justification: "New method for Manifest List reading"
     org.apache.iceberg:iceberg-core:
+      - code: "java.class.defaultSerializationChanged"
+        old: "class org.apache.iceberg.hadoop.SerializableConfiguration"
+        new: "class org.apache.iceberg.hadoop.SerializableConfiguration"
+        justification: "Serialization across versions is not guaranteed"
       - code: "java.class.noLongerInheritsFromClass"
         old: "class org.apache.iceberg.rest.auth.OAuth2Manager"
         new: "class org.apache.iceberg.rest.auth.OAuth2Manager"
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java 
b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index dce7697319..7e5746ec91 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -22,14 +22,11 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.hadoop.HadoopConfigurable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.SerializableMap;
-import org.apache.iceberg.util.SerializableSupplier;
 
 /**
  * A read-only serializable table that can be sent to other nodes in a cluster.
@@ -83,7 +80,7 @@ public class SerializableTable implements Table, 
HasTableOperations, Serializabl
     Map<Integer, PartitionSpec> specs = table.specs();
     specs.forEach((specId, spec) -> specAsJsonMap.put(specId, 
PartitionSpecParser.toJson(spec)));
     this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
-    this.io = fileIO(table);
+    this.io = table.io();
     this.encryption = table.encryption();
     this.locationProviderTry = Try.of(table::locationProvider);
     this.refs = SerializableMap.copyOf(table.refs());
@@ -124,14 +121,6 @@ public class SerializableTable implements Table, 
HasTableOperations, Serializabl
     }
   }
 
-  private FileIO fileIO(Table table) {
-    if (table.io() instanceof HadoopConfigurable) {
-      ((HadoopConfigurable) 
table.io()).serializeConfWith(SerializableConfSupplier::new);
-    }
-
-    return table.io();
-  }
-
   private Table lazyTable() {
     if (lazyTable == null) {
       synchronized (this) {
@@ -453,31 +442,4 @@ public class SerializableTable implements Table, 
HasTableOperations, Serializabl
       return type;
     }
   }
-
-  // captures the current state of a Hadoop configuration in a serializable 
manner
-  private static class SerializableConfSupplier implements 
SerializableSupplier<Configuration> {
-
-    private final Map<String, String> confAsMap;
-    private transient volatile Configuration conf = null;
-
-    SerializableConfSupplier(Configuration conf) {
-      this.confAsMap = Maps.newHashMapWithExpectedSize(conf.size());
-      conf.forEach(entry -> confAsMap.put(entry.getKey(), entry.getValue()));
-    }
-
-    @Override
-    public Configuration get() {
-      if (conf == null) {
-        synchronized (this) {
-          if (conf == null) {
-            Configuration newConf = new Configuration(false);
-            confAsMap.forEach(newConf::set);
-            this.conf = newConf;
-          }
-        }
-      }
-
-      return conf;
-    }
-  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index a4ac5e2ff6..877290f48e 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -66,9 +66,14 @@ public class HadoopFileIO implements HadoopConfigurable, 
DelegateFileIO {
   public HadoopFileIO() {}
 
   public HadoopFileIO(Configuration hadoopConf) {
-    this(new SerializableConfiguration(hadoopConf)::get);
+    this(new SerializableConfiguration(hadoopConf));
   }
 
+  /**
+   * @deprecated since 1.11.0, will be removed in 1.12.0; use {@link
+   *     HadoopFileIO#HadoopFileIO(Configuration)} instead.
+   */
+  @Deprecated
   public HadoopFileIO(SerializableSupplier<Configuration> hadoopConf) {
     this.hadoopConf = hadoopConf;
   }
@@ -115,7 +120,7 @@ public class HadoopFileIO implements HadoopConfigurable, 
DelegateFileIO {
 
   @Override
   public void setConf(Configuration conf) {
-    this.hadoopConf = new SerializableConfiguration(conf)::get;
+    this.hadoopConf = new SerializableConfiguration(conf);
   }
 
   @Override
@@ -125,7 +130,7 @@ public class HadoopFileIO implements HadoopConfigurable, 
DelegateFileIO {
     if (hadoopConf == null) {
       synchronized (this) {
         if (hadoopConf == null) {
-          this.hadoopConf = new SerializableConfiguration(new 
Configuration())::get;
+          this.hadoopConf = new SerializableConfiguration(new Configuration());
         }
       }
     }
@@ -133,6 +138,10 @@ public class HadoopFileIO implements HadoopConfigurable, 
DelegateFileIO {
     return hadoopConf.get();
   }
 
+  /**
+   * @deprecated since 1.11.0, will be removed in 1.12.0.
+   */
+  @Deprecated
   @Override
   public void serializeConfWith(
       Function<Configuration, SerializableSupplier<Configuration>> 
confSerializer) {
diff --git 
a/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java 
b/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java
index 3e9f17455f..8c660bc29d 100644
--- 
a/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java
+++ 
b/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java
@@ -18,33 +18,34 @@
  */
 package org.apache.iceberg.hadoop;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableSupplier;
 
 /** Wraps a {@link Configuration} object in a {@link Serializable} layer. */
-public class SerializableConfiguration implements Serializable {
-
-  private transient Configuration hadoopConf;
+public class SerializableConfiguration implements 
SerializableSupplier<Configuration> {
+  private final Map<String, String> confAsMap;
+  private transient volatile Configuration hadoopConf = null;
 
   public SerializableConfiguration(Configuration hadoopConf) {
-    this.hadoopConf = hadoopConf;
-  }
-
-  private void writeObject(ObjectOutputStream out) throws IOException {
-    out.defaultWriteObject();
-    hadoopConf.write(out);
-  }
-
-  private void readObject(ObjectInputStream in) throws ClassNotFoundException, 
IOException {
-    in.defaultReadObject();
-    hadoopConf = new Configuration(false);
-    hadoopConf.readFields(in);
+    this.confAsMap = Maps.newHashMapWithExpectedSize(hadoopConf.size());
+    hadoopConf.forEach(entry -> confAsMap.put(entry.getKey(), 
entry.getValue()));
   }
 
+  @Override
   public Configuration get() {
+    if (hadoopConf == null) {
+      synchronized (this) {
+        if (hadoopConf == null) {
+          Configuration newConf = new Configuration(false);
+          confAsMap.forEach(newConf::set);
+          this.hadoopConf = newConf;
+        }
+      }
+    }
+
     return hadoopConf;
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java 
b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index a39b3413d5..c27c609d94 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -154,7 +154,7 @@ public class ResolvingFileIO
 
   @Override
   public void setConf(Configuration conf) {
-    this.hadoopConf = new SerializableConfiguration(conf)::get;
+    this.hadoopConf = new SerializableConfiguration(conf);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java 
b/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
index 216f55eae3..c65251abcf 100644
--- a/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
@@ -43,7 +43,7 @@ public class SerializationUtil {
    * @return serialized bytes
    */
   public static byte[] serializeToBytes(Object obj) {
-    return serializeToBytes(obj, conf -> new 
SerializableConfiguration(conf)::get);
+    return serializeToBytes(obj, SerializableConfiguration::new);
   }
 
   /**
diff --git 
a/core/src/test/java/org/apache/iceberg/hadoop/TestSerializableConfiguration.java
 
b/core/src/test/java/org/apache/iceberg/hadoop/TestSerializableConfiguration.java
new file mode 100644
index 0000000000..8912b0135f
--- /dev/null
+++ 
b/core/src/test/java/org/apache/iceberg/hadoop/TestSerializableConfiguration.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hadoop;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.TestHelpers;
+import org.junit.jupiter.api.Test;
+
+public class TestSerializableConfiguration {
+
+  @Test
+  public void kryoSerialization() throws IOException {
+    Configuration configuration = new Configuration();
+    configuration.set("prefix.key1", "value1");
+    configuration.set("prefix.key2", "value2");
+    SerializableConfiguration conf = new 
SerializableConfiguration(configuration);
+    SerializableConfiguration serialized = 
TestHelpers.KryoHelpers.roundTripSerialize(conf);
+
+    assertThat(serialized.get().getPropsWithPrefix("prefix"))
+        .isEqualTo(conf.get().getPropsWithPrefix("prefix"))
+        .isEqualTo(configuration.getPropsWithPrefix("prefix"));
+  }
+
+  @Test
+  public void javaSerialization() throws IOException, ClassNotFoundException {
+    Configuration configuration = new Configuration();
+    configuration.set("prefix.key1", "value1");
+    configuration.set("prefix.key2", "value2");
+    SerializableConfiguration conf = new 
SerializableConfiguration(configuration);
+    SerializableConfiguration serialized = 
TestHelpers.roundTripSerialize(conf);
+
+    assertThat(serialized.get().getPropsWithPrefix("prefix"))
+        .isEqualTo(conf.get().getPropsWithPrefix("prefix"))
+        .isEqualTo(configuration.getPropsWithPrefix("prefix"));
+  }
+}

Reply via email to