This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 01393a0  Core: Add HadoopConfigurable interface to serialize custom 
FileIO (#2678)
01393a0 is described below

commit 01393a06c284175edab75de34f48b2bfbd606081
Author: Jack Ye <[email protected]>
AuthorDate: Tue Jun 22 17:25:25 2021 -0700

    Core: Add HadoopConfigurable interface to serialize custom FileIO (#2678)
---
 .../java/org/apache/iceberg/SerializableTable.java | 12 +++---
 .../apache/iceberg/hadoop/HadoopConfigurable.java  | 44 ++++++++++++++++++++++
 .../org/apache/iceberg/hadoop/HadoopFileIO.java    |  9 ++++-
 .../org/apache/iceberg/util/SerializationUtil.java | 26 +++++++++++++
 .../apache/iceberg/mr/mapreduce/IcebergSplit.java  | 10 +----
 .../java/org/apache/iceberg/spark/SparkUtil.java   | 11 +++---
 6 files changed, 88 insertions(+), 24 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java 
b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index 5b649f6..8bfd76a 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -102,13 +102,11 @@ public class SerializableTable implements Table, 
Serializable {
   }
 
   private FileIO fileIO(Table table) {
-    if (table.io() instanceof HadoopFileIO) {
-      HadoopFileIO hadoopFileIO = (HadoopFileIO) table.io();
-      SerializableConfiguration serializedConf = new 
SerializableConfiguration(hadoopFileIO.getConf());
-      return new HadoopFileIO(serializedConf::get);
-    } else {
-      return table.io();
+    if (table.io() instanceof HadoopConfigurable) {
+      ((HadoopConfigurable) table.io()).serializeConfWith(conf -> new 
SerializableConfiguration(conf)::get);
     }
+
+    return table.io();
   }
 
   private Table lazyTable() {
diff --git 
a/core/src/main/java/org/apache/iceberg/hadoop/HadoopConfigurable.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopConfigurable.java
new file mode 100644
index 0000000..9e79c3b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopConfigurable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.util.SerializableSupplier;
+
+/**
+ * An interface that extends the Hadoop {@link Configurable} interface to 
offer better serialization support for
+ * customizable Iceberg objects such as {@link org.apache.iceberg.io.FileIO}.
+ * <p>
+ * If an object is serialized and needs to use Hadoop configuration, it is 
recommended for the object to implement
+ * this interface so that a serializable supplier of configuration can be 
provided instead of an actual Hadoop
+ * configuration which is not serializable.
+ */
+public interface HadoopConfigurable extends Configurable {
+
+  /**
+   * Take a function that serializes Hadoop configuration into a supplier. An 
implementation is supposed to pass in
+   * its current Hadoop configuration into this function, and the result can 
be safely serialized for future use.
+   * @param confSerializer A function that takes Hadoop configuration and 
returns a serializable supplier of it.
+   */
+  void serializeConfWith(Function<Configuration, 
SerializableSupplier<Configuration>> confSerializer);
+
+}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index 34d66bf..1c53240 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -20,7 +20,7 @@
 package org.apache.iceberg.hadoop;
 
 import java.io.IOException;
-import org.apache.hadoop.conf.Configurable;
+import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,7 +30,7 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.util.SerializableSupplier;
 
-public class HadoopFileIO implements FileIO, Configurable {
+public class HadoopFileIO implements FileIO, HadoopConfigurable {
 
   private SerializableSupplier<Configuration> hadoopConf;
 
@@ -84,4 +84,9 @@ public class HadoopFileIO implements FileIO, Configurable {
   public Configuration getConf() {
     return hadoopConf.get();
   }
+
+  @Override
+  public void serializeConfWith(Function<Configuration, 
SerializableSupplier<Configuration>> confSerializer) {
+    this.hadoopConf = confSerializer.apply(getConf());
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java 
b/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
index 52b14d9..f7ccd7a 100644
--- a/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
@@ -27,13 +27,39 @@ import java.io.ObjectOutputStream;
 import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
 
 public class SerializationUtil {
 
   private SerializationUtil() {
   }
 
+  /**
+   * Serialize an object to bytes. If the object implements {@link 
HadoopConfigurable}, its Hadoop configuration will
+   * be serialized into a {@link SerializableConfiguration}.
+   * @param obj object to serialize
+   * @return serialized bytes
+   */
   public static byte[] serializeToBytes(Object obj) {
+    return serializeToBytes(obj, conf -> new 
SerializableConfiguration(conf)::get);
+  }
+
+  /**
+   * Serialize an object to bytes. If the object implements {@link 
HadoopConfigurable}, the confSerializer will be used
+   * to serialize Hadoop configuration used by the object.
+   * @param obj object to serialize
+   * @param confSerializer serializer for the Hadoop configuration
+   * @return serialized bytes
+   */
+  public static byte[] serializeToBytes(Object obj,
+                                        Function<Configuration, 
SerializableSupplier<Configuration>> confSerializer) {
+    if (obj instanceof HadoopConfigurable) {
+      ((HadoopConfigurable) obj).serializeConfWith(confSerializer);
+    }
+
     try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
          ObjectOutputStream oos = new ObjectOutputStream(baos)) {
       oos.writeObject(obj);
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
index 43b78d6..632224e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
@@ -27,8 +27,6 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.hadoop.SerializableConfiguration;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.mr.InputFormatConfig;
@@ -92,13 +90,7 @@ public class IcebergSplit extends InputSplit implements 
org.apache.hadoop.mapred
     out.writeInt(data.length);
     out.write(data);
 
-    byte[] ioData;
-    if (io instanceof HadoopFileIO) {
-      SerializableConfiguration serializableConf = new 
SerializableConfiguration(((HadoopFileIO) io).conf());
-      ioData = SerializationUtil.serializeToBytes(new 
HadoopFileIO(serializableConf::get));
-    } else {
-      ioData = SerializationUtil.serializeToBytes(io);
-    }
+    byte[] ioData = SerializationUtil.serializeToBytes(io);
     out.writeInt(ioData.length);
     out.write(ioData);
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java 
b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 3537d94..53ca293 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -26,7 +26,7 @@ import java.util.stream.Collectors;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.transforms.Transform;
@@ -39,13 +39,12 @@ public class SparkUtil {
   }
 
   public static FileIO serializableFileIO(Table table) {
-    if (table.io() instanceof HadoopFileIO) {
+    if (table.io() instanceof HadoopConfigurable) {
       // we need to use Spark's SerializableConfiguration to avoid issues with 
Kryo serialization
-      SerializableConfiguration conf = new 
SerializableConfiguration(((HadoopFileIO) table.io()).conf());
-      return new HadoopFileIO(conf::value);
-    } else {
-      return table.io();
+      ((HadoopConfigurable) table.io()).serializeConfWith(conf -> new 
SerializableConfiguration(conf)::value);
     }
+
+    return table.io();
   }
 
   /**

Reply via email to