This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 01393a0 Core: Add HadoopConfigurable interface to serialize custom
FileIO (#2678)
01393a0 is described below
commit 01393a06c284175edab75de34f48b2bfbd606081
Author: Jack Ye <[email protected]>
AuthorDate: Tue Jun 22 17:25:25 2021 -0700
Core: Add HadoopConfigurable interface to serialize custom FileIO (#2678)
---
.../java/org/apache/iceberg/SerializableTable.java | 12 +++---
.../apache/iceberg/hadoop/HadoopConfigurable.java | 44 ++++++++++++++++++++++
.../org/apache/iceberg/hadoop/HadoopFileIO.java | 9 ++++-
.../org/apache/iceberg/util/SerializationUtil.java | 26 +++++++++++++
.../apache/iceberg/mr/mapreduce/IcebergSplit.java | 10 +----
.../java/org/apache/iceberg/spark/SparkUtil.java | 11 +++---
6 files changed, 88 insertions(+), 24 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java
b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index 5b649f6..8bfd76a 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -24,7 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -102,13 +102,11 @@ public class SerializableTable implements Table,
Serializable {
}
private FileIO fileIO(Table table) {
- if (table.io() instanceof HadoopFileIO) {
- HadoopFileIO hadoopFileIO = (HadoopFileIO) table.io();
- SerializableConfiguration serializedConf = new
SerializableConfiguration(hadoopFileIO.getConf());
- return new HadoopFileIO(serializedConf::get);
- } else {
- return table.io();
+ if (table.io() instanceof HadoopConfigurable) {
+ ((HadoopConfigurable) table.io()).serializeConfWith(conf -> new
SerializableConfiguration(conf)::get);
}
+
+ return table.io();
}
private Table lazyTable() {
diff --git
a/core/src/main/java/org/apache/iceberg/hadoop/HadoopConfigurable.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopConfigurable.java
new file mode 100644
index 0000000..9e79c3b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopConfigurable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.util.SerializableSupplier;
+
+/**
+ * An interface that extends the Hadoop {@link Configurable} interface to
offer better serialization support for
+ * customizable Iceberg objects such as {@link org.apache.iceberg.io.FileIO}.
+ * <p>
+ * If an object is serialized and needs to use Hadoop configuration, it is
recommended for the object to implement
+ * this interface so that a serializable supplier of configuration can be
provided instead of an actual Hadoop
+ * configuration which is not serializable.
+ */
+public interface HadoopConfigurable extends Configurable {
+
+ /**
+ * Take a function that serializes Hadoop configuration into a supplier. An
implementation is supposed to pass in
+ * its current Hadoop configuration into this function, and the result can
be safely serialized for future use.
+ * @param confSerializer A function that takes Hadoop configuration and
returns a serializable supplier of it.
+ */
+ void serializeConfWith(Function<Configuration,
SerializableSupplier<Configuration>> confSerializer);
+
+}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index 34d66bf..1c53240 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -20,7 +20,7 @@
package org.apache.iceberg.hadoop;
import java.io.IOException;
-import org.apache.hadoop.conf.Configurable;
+import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -30,7 +30,7 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.util.SerializableSupplier;
-public class HadoopFileIO implements FileIO, Configurable {
+public class HadoopFileIO implements FileIO, HadoopConfigurable {
private SerializableSupplier<Configuration> hadoopConf;
@@ -84,4 +84,9 @@ public class HadoopFileIO implements FileIO, Configurable {
public Configuration getConf() {
return hadoopConf.get();
}
+
+ @Override
+ public void serializeConfWith(Function<Configuration,
SerializableSupplier<Configuration>> confSerializer) {
+ this.hadoopConf = confSerializer.apply(getConf());
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
b/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
index 52b14d9..f7ccd7a 100644
--- a/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
@@ -27,13 +27,39 @@ import java.io.ObjectOutputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
public class SerializationUtil {
private SerializationUtil() {
}
+ /**
+ * Serialize an object to bytes. If the object implements {@link
HadoopConfigurable}, its Hadoop configuration will
+ * be serialized into a {@link SerializableConfiguration}.
+ * @param obj object to serialize
+ * @return serialized bytes
+ */
public static byte[] serializeToBytes(Object obj) {
+ return serializeToBytes(obj, conf -> new
SerializableConfiguration(conf)::get);
+ }
+
+ /**
+ * Serialize an object to bytes. If the object implements {@link
HadoopConfigurable}, the confSerializer will be used
+ * to serialize Hadoop configuration used by the object.
+ * @param obj object to serialize
+ * @param confSerializer serializer for the Hadoop configuration
+ * @return serialized bytes
+ */
+ public static byte[] serializeToBytes(Object obj,
+ Function<Configuration,
SerializableSupplier<Configuration>> confSerializer) {
+ if (obj instanceof HadoopConfigurable) {
+ ((HadoopConfigurable) obj).serializeConfWith(confSerializer);
+ }
+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(obj);
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
index 43b78d6..632224e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
@@ -27,8 +27,6 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.hadoop.HadoopFileIO;
-import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mr.InputFormatConfig;
@@ -92,13 +90,7 @@ public class IcebergSplit extends InputSplit implements
org.apache.hadoop.mapred
out.writeInt(data.length);
out.write(data);
- byte[] ioData;
- if (io instanceof HadoopFileIO) {
- SerializableConfiguration serializableConf = new
SerializableConfiguration(((HadoopFileIO) io).conf());
- ioData = SerializationUtil.serializeToBytes(new
HadoopFileIO(serializableConf::get));
- } else {
- ioData = SerializationUtil.serializeToBytes(io);
- }
+ byte[] ioData = SerializationUtil.serializeToBytes(io);
out.writeInt(ioData.length);
out.write(ioData);
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index 3537d94..53ca293 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -26,7 +26,7 @@ import java.util.stream.Collectors;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
-import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.transforms.Transform;
@@ -39,13 +39,12 @@ public class SparkUtil {
}
public static FileIO serializableFileIO(Table table) {
- if (table.io() instanceof HadoopFileIO) {
+ if (table.io() instanceof HadoopConfigurable) {
// we need to use Spark's SerializableConfiguration to avoid issues with
Kryo serialization
- SerializableConfiguration conf = new
SerializableConfiguration(((HadoopFileIO) table.io()).conf());
- return new HadoopFileIO(conf::value);
- } else {
- return table.io();
+ ((HadoopConfigurable) table.io()).serializeConfWith(conf -> new
SerializableConfiguration(conf)::value);
}
+
+ return table.io();
}
/**