This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0420cde26 API: Add generic FileIO JSON serialization (#5178)
0420cde26 is described below

commit 0420cde265bee4a0a8e1e0aaad8297242848b3cd
Author: Ryan Blue <[email protected]>
AuthorDate: Sun Jul 3 11:59:38 2022 -0700

    API: Add generic FileIO JSON serialization (#5178)
    
    * API: Add generic FileIO JSON serialization.
    
    * Fix checkstyle.
    
    * Close StringWriter and JsonGenerator.
---
 .../main/java/org/apache/iceberg/io/FileIO.java    |  9 +++
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   | 15 ++--
 .../org/apache/iceberg/aws/s3/TestS3FileIO.java    | 19 +++++
 .../java/org/apache/iceberg/SortOrderParser.java   | 36 +---------
 .../org/apache/iceberg/hadoop/HadoopFileIO.java    |  7 ++
 .../java/org/apache/iceberg/io/FileIOParser.java   | 84 ++++++++++++++++++++++
 .../org/apache/iceberg/io/ResolvingFileIO.java     |  5 ++
 .../java/org/apache/iceberg/util/JsonUtil.java     | 50 +++++++++++++
 .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java | 13 +++-
 .../apache/iceberg/TestFileIOSerialization.java    | 19 +++++
 10 files changed, 217 insertions(+), 40 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java 
b/api/src/main/java/org/apache/iceberg/io/FileIO.java
index 6bb84ba69..187b53fa2 100644
--- a/api/src/main/java/org/apache/iceberg/io/FileIO.java
+++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java
@@ -61,6 +61,15 @@ public interface FileIO extends Serializable, Closeable {
     deleteFile(file.location());
   }
 
+  /**
+   * @return the property map used to configure this FileIO
+   * @throws UnsupportedOperationException if this FileIO does not expose its 
configuration properties
+   */
+  default Map<String, String> properties() {
+    throw new UnsupportedOperationException(String.format(
+        "%s does not expose configuration properties", 
this.getClass().toString()));
+  }
+
   /**
    * Initialize File IO from catalog properties.
    * @param properties catalog properties
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 3cd557c37..f09eb948c 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -79,6 +79,7 @@ public class S3FileIO implements FileIO, 
SupportsBulkOperations, SupportsPrefixO
   private String credential = null;
   private SerializableSupplier<S3Client> s3;
   private AwsProperties awsProperties;
+  private Map<String, String> properties = null;
   private transient volatile S3Client client;
   private MetricsContext metrics = MetricsContext.nullMetrics();
   private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
@@ -144,6 +145,11 @@ public class S3FileIO implements FileIO, 
SupportsBulkOperations, SupportsPrefixO
     client().deleteObject(deleteRequest);
   }
 
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
   /**
    * Deletes the given paths in a batched manner.
    * <p>
@@ -302,12 +308,13 @@ public class S3FileIO implements FileIO, 
SupportsBulkOperations, SupportsPrefixO
   }
 
   @Override
-  public void initialize(Map<String, String> properties) {
-    this.awsProperties = new AwsProperties(properties);
+  public void initialize(Map<String, String> props) {
+    this.awsProperties = new AwsProperties(props);
+    this.properties = props;
 
     // Do not override s3 client if it was provided
     if (s3 == null) {
-      AwsClientFactory clientFactory = AwsClientFactories.from(properties);
+      AwsClientFactory clientFactory = AwsClientFactories.from(props);
       if (clientFactory instanceof CredentialSupplier) {
         this.credential = ((CredentialSupplier) clientFactory).getCredential();
       }
@@ -322,7 +329,7 @@ public class S3FileIO implements FileIO, 
SupportsBulkOperations, SupportsPrefixO
               .hiddenImpl(DEFAULT_METRICS_IMPL, String.class)
               .buildChecked();
       MetricsContext context = ctor.newInstance("s3");
-      context.initialize(properties);
+      context.initialize(props);
       this.metrics = context;
     } catch (NoClassDefFoundError | NoSuchMethodException | ClassCastException 
e) {
       LOG.warn("Unable to load metrics class: '{}', falling back to null 
metrics", DEFAULT_METRICS_IMPL, e);
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index a34300aa6..972df9da8 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -28,8 +28,11 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOParser;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -225,6 +228,22 @@ public class TestS3FileIO {
     });
   }
 
+  @Test
+  public void testFileIOJsonSerialization() {
+    Object conf;
+    if (s3FileIO instanceof Configurable) {
+      conf = ((Configurable) s3FileIO).getConf();
+    } else {
+      conf = null;
+    }
+
+    String json = FileIOParser.toJson(s3FileIO);
+    try (FileIO deserialized = FileIOParser.fromJson(json, conf)) {
+      Assert.assertTrue(deserialized instanceof S3FileIO);
+      Assert.assertEquals(s3FileIO.properties(), deserialized.properties());
+    }
+  }
+
   private void createRandomObjects(String prefix, int count) {
     S3URI s3URI = new S3URI(prefix);
 
diff --git a/core/src/main/java/org/apache/iceberg/SortOrderParser.java 
b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
index 002c859d8..7b242e551 100644
--- a/core/src/main/java/org/apache/iceberg/SortOrderParser.java
+++ b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
@@ -22,8 +22,6 @@ package org.apache.iceberg;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
-import java.io.StringWriter;
-import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.Locale;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -56,19 +54,7 @@ public class SortOrderParser {
   }
 
   public static String toJson(SortOrder sortOrder, boolean pretty) {
-    try {
-      StringWriter writer = new StringWriter();
-      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
-      if (pretty) {
-        generator.useDefaultPrettyPrinter();
-      }
-      toJson(sortOrder, generator);
-      generator.flush();
-      return writer.toString();
-
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
+    return JsonUtil.generate(gen -> toJson(sortOrder, gen), pretty);
   }
 
   private static String toJson(SortDirection direction) {
@@ -105,19 +91,7 @@ public class SortOrderParser {
   }
 
   public static String toJson(UnboundSortOrder sortOrder, boolean pretty) {
-    try {
-      StringWriter writer = new StringWriter();
-      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
-      if (pretty) {
-        generator.useDefaultPrettyPrinter();
-      }
-      toJson(sortOrder, generator);
-      generator.flush();
-      return writer.toString();
-
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
+    return JsonUtil.generate(gen -> toJson(sortOrder, gen), pretty);
   }
 
   private static void toJsonFields(UnboundSortOrder sortOrder, JsonGenerator 
generator) throws IOException {
@@ -142,11 +116,7 @@ public class SortOrderParser {
   }
 
   public static UnboundSortOrder fromJson(String json) {
-    try {
-      return fromJson(JsonUtil.mapper().readValue(json, JsonNode.class));
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
+    return JsonUtil.parse(json, SortOrderParser::fromJson);
   }
 
   public static UnboundSortOrder fromJson(JsonNode json) {
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java 
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index f3c1db18e..fd207eb2f 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.hadoop;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +34,7 @@ import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.util.SerializableSupplier;
 
@@ -81,6 +83,11 @@ public class HadoopFileIO implements FileIO, 
HadoopConfigurable, SupportsPrefixO
     }
   }
 
+  @Override
+  public Map<String, String> properties() {
+    return ImmutableMap.of();
+  }
+
   @Override
   public void setConf(Configuration conf) {
     this.hadoopConf = new SerializableConfiguration(conf)::get;
diff --git a/core/src/main/java/org/apache/iceberg/io/FileIOParser.java 
b/core/src/main/java/org/apache/iceberg/io/FileIOParser.java
new file mode 100644
index 000000000..3f3c5aac0
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/FileIOParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FileIOParser {
+  private FileIOParser() {
+  }
+
+  private static final String FILE_IO_IMPL = "io-impl";
+  private static final String PROPERTIES = "properties";
+
+  public static String toJson(FileIO io) {
+    return toJson(io, false);
+  }
+
+  public static String toJson(FileIO io, boolean pretty) {
+    return JsonUtil.generate(gen -> toJson(io, gen), pretty);
+  }
+
+  private static void toJson(FileIO io, JsonGenerator generator) throws 
IOException {
+    String impl = io.getClass().getName();
+    Map<String, String> properties;
+    try {
+      properties = io.properties();
+    } catch (UnsupportedOperationException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot serialize FileIO: %s does not expose configuration 
properties", impl));
+    }
+
+    Preconditions.checkArgument(properties != null,
+        "Cannot serialize FileIO: invalid configuration properties (null)", 
impl);
+
+    generator.writeStartObject();
+
+    generator.writeStringField(FILE_IO_IMPL, impl);
+    generator.writeObjectFieldStart(PROPERTIES);
+    for (Map.Entry<String, String> pair : properties.entrySet()) {
+      generator.writeStringField(pair.getKey(), pair.getValue());
+    }
+    generator.writeEndObject();
+
+    generator.writeEndObject();
+  }
+
+  public static FileIO fromJson(String json) {
+    return fromJson(json, null);
+  }
+
+  public static FileIO fromJson(String json, Object conf) {
+    return JsonUtil.parse(json, node -> fromJson(node, conf));
+  }
+
+  private static FileIO fromJson(JsonNode json, Object conf) {
+    Preconditions.checkArgument(json.isObject(), "Cannot parse FileIO from 
non-object: %s", json);
+    String impl = JsonUtil.getString(FILE_IO_IMPL, json);
+    Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, json);
+    return CatalogUtil.loadFileIO(impl, properties, conf);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java 
b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index 03cc9a2de..3b6d9725f 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -73,6 +73,11 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable {
     io(location).deleteFile(location);
   }
 
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
   @Override
   public void initialize(Map<String, String> newProperties) {
     close(); // close and discard any existing FileIO instances
diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java 
b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
index ff4a185d8..3af718738 100644
--- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
@@ -25,6 +25,8 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import java.io.IOException;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +52,54 @@ public class JsonUtil {
     return MAPPER;
   }
 
+  @FunctionalInterface
+  public interface ToJson {
+    void generate(JsonGenerator gen) throws IOException;
+  }
+
+  /**
+   * Helper for writing JSON with a JsonGenerator.
+   *
+   * @param toJson a function to produce JSON using a JsonGenerator
+   * @param pretty whether to pretty-print JSON for readability
+   * @return a JSON string produced from the generator
+   */
+  public static String generate(ToJson toJson, boolean pretty) {
+    try (StringWriter writer = new StringWriter();
+         JsonGenerator generator = JsonUtil.factory().createGenerator(writer)) 
{
+      if (pretty) {
+        generator.useDefaultPrettyPrinter();
+      }
+      toJson.generate(generator);
+      generator.flush();
+      return writer.toString();
+
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @FunctionalInterface
+  public interface FromJson<T> {
+    T parse(JsonNode node);
+  }
+
+  /**
+   * Helper for parsing JSON from a String.
+   *
+   * @param json a JSON string
+   * @param parser a function that converts a JsonNode to a Java object
+   * @param <T> type of objects created by the parser
+   * @return the parsed Java object
+   */
+  public static <T> T parse(String json, FromJson<T> parser) {
+    try {
+      return parser.parse(JsonUtil.mapper().readValue(json, JsonNode.class));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
   public static int getInt(String property, JsonNode node) {
     Preconditions.checkArgument(node.has(property), "Cannot parse missing int 
%s", property);
     JsonNode pNode = node.get(property);
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index 36fba6295..24e37e23d 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -52,6 +52,7 @@ public class GCSFileIO implements FileIO {
   private transient volatile Storage storage;
   private MetricsContext metrics = MetricsContext.nullMetrics();
   private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
+  private Map<String, String> properties = null;
 
   /**
    * No-arg constructor to load the FileIO dynamically.
@@ -94,6 +95,11 @@ public class GCSFileIO implements FileIO {
     }
   }
 
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
   private Storage client() {
     if (storage == null) {
       synchronized (this) {
@@ -106,8 +112,9 @@ public class GCSFileIO implements FileIO {
   }
 
   @Override
-  public void initialize(Map<String, String> properties) {
-    this.gcpProperties = new GCPProperties(properties);
+  public void initialize(Map<String, String> props) {
+    this.properties = props;
+    this.gcpProperties = new GCPProperties(props);
 
     this.storageSupplier = () -> {
       StorageOptions.Builder builder = StorageOptions.newBuilder();
@@ -121,7 +128,7 @@ public class GCSFileIO implements FileIO {
         DynConstructors.Ctor<MetricsContext> ctor =
             
DynConstructors.builder(MetricsContext.class).hiddenImpl(DEFAULT_METRICS_IMPL, 
String.class).buildChecked();
         MetricsContext context = ctor.newInstance("gcs");
-        context.initialize(properties);
+        context.initialize(props);
         this.metrics = context;
       } catch (NoClassDefFoundError | NoSuchMethodException | 
ClassCastException e) {
         LOG.warn("Unable to load metrics class: '{}', falling back to null 
metrics", DEFAULT_METRICS_IMPL, e);
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
index 7c14485ff..8e38b902c 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
@@ -22,10 +22,12 @@ package org.apache.iceberg;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOParser;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
@@ -110,4 +112,21 @@ public class TestFileIOSerialization {
     conf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
     return map;
   }
+
+  @Test
+  public void testFileIOJsonSerialization() {
+    FileIO io = table.io();
+    Object conf;
+    if (io instanceof Configurable) {
+      conf = ((Configurable) io).getConf();
+    } else {
+      conf = null;
+    }
+
+    String json = FileIOParser.toJson(io);
+    try (FileIO deserialized = FileIOParser.fromJson(json, conf)) {
+      Assert.assertTrue(deserialized instanceof HadoopFileIO);
+      Assert.assertEquals(io.properties(), deserialized.properties());
+    }
+  }
 }

Reply via email to