This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0420cde26 API: Add generic FileIO JSON serialization (#5178)
0420cde26 is described below
commit 0420cde265bee4a0a8e1e0aaad8297242848b3cd
Author: Ryan Blue <[email protected]>
AuthorDate: Sun Jul 3 11:59:38 2022 -0700
API: Add generic FileIO JSON serialization (#5178)
* API: Add generic FileIO JSON serialization.
* Fix checkstyle.
* Close StringWriter and JsonGenerator.
---
.../main/java/org/apache/iceberg/io/FileIO.java | 9 +++
.../java/org/apache/iceberg/aws/s3/S3FileIO.java | 15 ++--
.../org/apache/iceberg/aws/s3/TestS3FileIO.java | 19 +++++
.../java/org/apache/iceberg/SortOrderParser.java | 36 +---------
.../org/apache/iceberg/hadoop/HadoopFileIO.java | 7 ++
.../java/org/apache/iceberg/io/FileIOParser.java | 84 ++++++++++++++++++++++
.../org/apache/iceberg/io/ResolvingFileIO.java | 5 ++
.../java/org/apache/iceberg/util/JsonUtil.java | 50 +++++++++++++
.../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java | 13 +++-
.../apache/iceberg/TestFileIOSerialization.java | 19 +++++
10 files changed, 217 insertions(+), 40 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/io/FileIO.java
b/api/src/main/java/org/apache/iceberg/io/FileIO.java
index 6bb84ba69..187b53fa2 100644
--- a/api/src/main/java/org/apache/iceberg/io/FileIO.java
+++ b/api/src/main/java/org/apache/iceberg/io/FileIO.java
@@ -61,6 +61,15 @@ public interface FileIO extends Serializable, Closeable {
deleteFile(file.location());
}
+ /**
+ * @return the property map used to configure this FileIO
+ * @throws UnsupportedOperationException if this FileIO does not expose its
configuration properties
+ */
+ default Map<String, String> properties() {
+ throw new UnsupportedOperationException(String.format(
+ "%s does not expose configuration properties",
this.getClass().toString()));
+ }
+
/**
* Initialize File IO from catalog properties.
* @param properties catalog properties
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 3cd557c37..f09eb948c 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -79,6 +79,7 @@ public class S3FileIO implements FileIO,
SupportsBulkOperations, SupportsPrefixO
private String credential = null;
private SerializableSupplier<S3Client> s3;
private AwsProperties awsProperties;
+ private Map<String, String> properties = null;
private transient volatile S3Client client;
private MetricsContext metrics = MetricsContext.nullMetrics();
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
@@ -144,6 +145,11 @@ public class S3FileIO implements FileIO,
SupportsBulkOperations, SupportsPrefixO
client().deleteObject(deleteRequest);
}
+ @Override
+ public Map<String, String> properties() {
+ return properties;
+ }
+
/**
* Deletes the given paths in a batched manner.
* <p>
@@ -302,12 +308,13 @@ public class S3FileIO implements FileIO,
SupportsBulkOperations, SupportsPrefixO
}
@Override
- public void initialize(Map<String, String> properties) {
- this.awsProperties = new AwsProperties(properties);
+ public void initialize(Map<String, String> props) {
+ this.awsProperties = new AwsProperties(props);
+ this.properties = props;
// Do not override s3 client if it was provided
if (s3 == null) {
- AwsClientFactory clientFactory = AwsClientFactories.from(properties);
+ AwsClientFactory clientFactory = AwsClientFactories.from(props);
if (clientFactory instanceof CredentialSupplier) {
this.credential = ((CredentialSupplier) clientFactory).getCredential();
}
@@ -322,7 +329,7 @@ public class S3FileIO implements FileIO,
SupportsBulkOperations, SupportsPrefixO
.hiddenImpl(DEFAULT_METRICS_IMPL, String.class)
.buildChecked();
MetricsContext context = ctor.newInstance("s3");
- context.initialize(properties);
+ context.initialize(props);
this.metrics = context;
} catch (NoClassDefFoundError | NoSuchMethodException | ClassCastException
e) {
LOG.warn("Unable to load metrics class: '{}', falling back to null
metrics", DEFAULT_METRICS_IMPL, e);
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index a34300aa6..972df9da8 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -28,8 +28,11 @@ import java.util.Map;
import java.util.Random;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hadoop.conf.Configurable;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOParser;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -225,6 +228,22 @@ public class TestS3FileIO {
});
}
+ @Test
+ public void testFileIOJsonSerialization() {
+ Object conf;
+ if (s3FileIO instanceof Configurable) {
+ conf = ((Configurable) s3FileIO).getConf();
+ } else {
+ conf = null;
+ }
+
+ String json = FileIOParser.toJson(s3FileIO);
+ try (FileIO deserialized = FileIOParser.fromJson(json, conf)) {
+ Assert.assertTrue(deserialized instanceof S3FileIO);
+ Assert.assertEquals(s3FileIO.properties(), deserialized.properties());
+ }
+ }
+
private void createRandomObjects(String prefix, int count) {
S3URI s3URI = new S3URI(prefix);
diff --git a/core/src/main/java/org/apache/iceberg/SortOrderParser.java
b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
index 002c859d8..7b242e551 100644
--- a/core/src/main/java/org/apache/iceberg/SortOrderParser.java
+++ b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
@@ -22,8 +22,6 @@ package org.apache.iceberg;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
-import java.io.StringWriter;
-import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.Locale;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -56,19 +54,7 @@ public class SortOrderParser {
}
public static String toJson(SortOrder sortOrder, boolean pretty) {
- try {
- StringWriter writer = new StringWriter();
- JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
- if (pretty) {
- generator.useDefaultPrettyPrinter();
- }
- toJson(sortOrder, generator);
- generator.flush();
- return writer.toString();
-
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ return JsonUtil.generate(gen -> toJson(sortOrder, gen), pretty);
}
private static String toJson(SortDirection direction) {
@@ -105,19 +91,7 @@ public class SortOrderParser {
}
public static String toJson(UnboundSortOrder sortOrder, boolean pretty) {
- try {
- StringWriter writer = new StringWriter();
- JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
- if (pretty) {
- generator.useDefaultPrettyPrinter();
- }
- toJson(sortOrder, generator);
- generator.flush();
- return writer.toString();
-
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ return JsonUtil.generate(gen -> toJson(sortOrder, gen), pretty);
}
private static void toJsonFields(UnboundSortOrder sortOrder, JsonGenerator
generator) throws IOException {
@@ -142,11 +116,7 @@ public class SortOrderParser {
}
public static UnboundSortOrder fromJson(String json) {
- try {
- return fromJson(JsonUtil.mapper().readValue(json, JsonNode.class));
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ return JsonUtil.parse(json, SortOrderParser::fromJson);
}
public static UnboundSortOrder fromJson(JsonNode json) {
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index f3c1db18e..fd207eb2f 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.hadoop;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
+import java.util.Map;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +34,7 @@ import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableSupplier;
@@ -81,6 +83,11 @@ public class HadoopFileIO implements FileIO,
HadoopConfigurable, SupportsPrefixO
}
}
+ @Override
+ public Map<String, String> properties() {
+ return ImmutableMap.of();
+ }
+
@Override
public void setConf(Configuration conf) {
this.hadoopConf = new SerializableConfiguration(conf)::get;
diff --git a/core/src/main/java/org/apache/iceberg/io/FileIOParser.java
b/core/src/main/java/org/apache/iceberg/io/FileIOParser.java
new file mode 100644
index 000000000..3f3c5aac0
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/FileIOParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class FileIOParser {
+ private FileIOParser() {
+ }
+
+ private static final String FILE_IO_IMPL = "io-impl";
+ private static final String PROPERTIES = "properties";
+
+ public static String toJson(FileIO io) {
+ return toJson(io, false);
+ }
+
+ public static String toJson(FileIO io, boolean pretty) {
+ return JsonUtil.generate(gen -> toJson(io, gen), pretty);
+ }
+
+ private static void toJson(FileIO io, JsonGenerator generator) throws
IOException {
+ String impl = io.getClass().getName();
+ Map<String, String> properties;
+ try {
+ properties = io.properties();
+ } catch (UnsupportedOperationException e) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot serialize FileIO: %s does not expose configuration
properties", impl));
+ }
+
+ Preconditions.checkArgument(properties != null,
+ "Cannot serialize FileIO: invalid configuration properties (null)",
impl);
+
+ generator.writeStartObject();
+
+ generator.writeStringField(FILE_IO_IMPL, impl);
+ generator.writeObjectFieldStart(PROPERTIES);
+ for (Map.Entry<String, String> pair : properties.entrySet()) {
+ generator.writeStringField(pair.getKey(), pair.getValue());
+ }
+ generator.writeEndObject();
+
+ generator.writeEndObject();
+ }
+
+ public static FileIO fromJson(String json) {
+ return fromJson(json, null);
+ }
+
+ public static FileIO fromJson(String json, Object conf) {
+ return JsonUtil.parse(json, node -> fromJson(node, conf));
+ }
+
+ private static FileIO fromJson(JsonNode json, Object conf) {
+ Preconditions.checkArgument(json.isObject(), "Cannot parse FileIO from
non-object: %s", json);
+ String impl = JsonUtil.getString(FILE_IO_IMPL, json);
+ Map<String, String> properties = JsonUtil.getStringMap(PROPERTIES, json);
+ return CatalogUtil.loadFileIO(impl, properties, conf);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index 03cc9a2de..3b6d9725f 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -73,6 +73,11 @@ public class ResolvingFileIO implements FileIO,
HadoopConfigurable {
io(location).deleteFile(location);
}
+ @Override
+ public Map<String, String> properties() {
+ return properties;
+ }
+
@Override
public void initialize(Map<String, String> newProperties) {
close(); // close and discard any existing FileIO instances
diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
index ff4a185d8..3af718738 100644
--- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
@@ -25,6 +25,8 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -50,6 +52,54 @@ public class JsonUtil {
return MAPPER;
}
+ @FunctionalInterface
+ public interface ToJson {
+ void generate(JsonGenerator gen) throws IOException;
+ }
+
+ /**
+ * Helper for writing JSON with a JsonGenerator.
+ *
+ * @param toJson a function to produce JSON using a JsonGenerator
+ * @param pretty whether to pretty-print JSON for readability
+ * @return a JSON string produced from the generator
+ */
+ public static String generate(ToJson toJson, boolean pretty) {
+ try (StringWriter writer = new StringWriter();
+ JsonGenerator generator = JsonUtil.factory().createGenerator(writer))
{
+ if (pretty) {
+ generator.useDefaultPrettyPrinter();
+ }
+ toJson.generate(generator);
+ generator.flush();
+ return writer.toString();
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @FunctionalInterface
+ public interface FromJson<T> {
+ T parse(JsonNode node);
+ }
+
+ /**
+ * Helper for parsing JSON from a String.
+ *
+ * @param json a JSON string
+ * @param parser a function that converts a JsonNode to a Java object
+ * @param <T> type of objects created by the parser
+ * @return the parsed Java object
+ */
+ public static <T> T parse(String json, FromJson<T> parser) {
+ try {
+ return parser.parse(JsonUtil.mapper().readValue(json, JsonNode.class));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
public static int getInt(String property, JsonNode node) {
Preconditions.checkArgument(node.has(property), "Cannot parse missing int
%s", property);
JsonNode pNode = node.get(property);
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index 36fba6295..24e37e23d 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -52,6 +52,7 @@ public class GCSFileIO implements FileIO {
private transient volatile Storage storage;
private MetricsContext metrics = MetricsContext.nullMetrics();
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
+ private Map<String, String> properties = null;
/**
* No-arg constructor to load the FileIO dynamically.
@@ -94,6 +95,11 @@ public class GCSFileIO implements FileIO {
}
}
+ @Override
+ public Map<String, String> properties() {
+ return properties;
+ }
+
private Storage client() {
if (storage == null) {
synchronized (this) {
@@ -106,8 +112,9 @@ public class GCSFileIO implements FileIO {
}
@Override
- public void initialize(Map<String, String> properties) {
- this.gcpProperties = new GCPProperties(properties);
+ public void initialize(Map<String, String> props) {
+ this.properties = props;
+ this.gcpProperties = new GCPProperties(props);
this.storageSupplier = () -> {
StorageOptions.Builder builder = StorageOptions.newBuilder();
@@ -121,7 +128,7 @@ public class GCSFileIO implements FileIO {
DynConstructors.Ctor<MetricsContext> ctor =
DynConstructors.builder(MetricsContext.class).hiddenImpl(DEFAULT_METRICS_IMPL,
String.class).buildChecked();
MetricsContext context = ctor.newInstance("gcs");
- context.initialize(properties);
+ context.initialize(props);
this.metrics = context;
} catch (NoClassDefFoundError | NoSuchMethodException |
ClassCastException e) {
LOG.warn("Unable to load metrics class: '{}', falling back to null
metrics", DEFAULT_METRICS_IMPL, e);
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
index 7c14485ff..8e38b902c 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/TestFileIOSerialization.java
@@ -22,10 +22,12 @@ package org.apache.iceberg;
import java.io.File;
import java.io.IOException;
import java.util.Map;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileIOParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
@@ -110,4 +112,21 @@ public class TestFileIOSerialization {
conf.forEach(entry -> map.put(entry.getKey(), entry.getValue()));
return map;
}
+
+ @Test
+ public void testFileIOJsonSerialization() {
+ FileIO io = table.io();
+ Object conf;
+ if (io instanceof Configurable) {
+ conf = ((Configurable) io).getConf();
+ } else {
+ conf = null;
+ }
+
+ String json = FileIOParser.toJson(io);
+ try (FileIO deserialized = FileIOParser.fromJson(json, conf)) {
+ Assert.assertTrue(deserialized instanceof HadoopFileIO);
+ Assert.assertEquals(io.properties(), deserialized.properties());
+ }
+ }
}