This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new cd283d8 Avro: Extract ResolvingDecoder caching into DecoderResolver
(#1234)
cd283d8 is described below
commit cd283d8b9baa874caa0b4c08981b189e80fd7378
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 27 04:32:43 2020 +0800
Avro: Extract ResolvingDecoder caching into DecoderResolver (#1234)
---
.../org/apache/iceberg/avro/GenericAvroReader.java | 39 +-----------
.../org/apache/iceberg/data/avro/DataReader.java | 37 +----------
.../apache/iceberg/data/avro/DecoderResolver.java | 73 ++++++++++++++++++++++
.../apache/iceberg/spark/data/SparkAvroReader.java | 38 +----------
4 files changed, 78 insertions(+), 109 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
index 05e0508..c8a81ca 100644
--- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
@@ -20,26 +20,18 @@
package org.apache.iceberg.avro;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.ResolvingDecoder;
import org.apache.iceberg.common.DynClasses;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
+import org.apache.iceberg.data.avro.DecoderResolver;
class GenericAvroReader<T> implements DatumReader<T> {
- private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>>
DECODER_CACHES =
- ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
-
private final Schema readSchema;
private ClassLoader loader = Thread.currentThread().getContextClassLoader();
private Schema fileSchema = null;
@@ -66,34 +58,7 @@ class GenericAvroReader<T> implements DatumReader<T> {
@Override
public T read(T reuse, Decoder decoder) throws IOException {
- ResolvingDecoder resolver = resolve(decoder);
- T value = reader.read(resolver, reuse);
- resolver.drain();
- return value;
- }
-
- private ResolvingDecoder resolve(Decoder decoder) throws IOException {
- Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
- Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
- .computeIfAbsent(readSchema, k -> new HashMap<>());
-
- ResolvingDecoder resolver = fileSchemaToResolver.get(fileSchema);
- if (resolver == null) {
- resolver = newResolver();
- fileSchemaToResolver.put(fileSchema, resolver);
- }
-
- resolver.configure(decoder);
-
- return resolver;
- }
-
- private ResolvingDecoder newResolver() {
- try {
- return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema,
null);
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
+ return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema,
reader, reuse);
}
private static class ReadBuilder extends AvroSchemaVisitor<ValueReader<?>> {
diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
index d75acf9..7c8cabc 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
@@ -20,7 +20,6 @@
package org.apache.iceberg.data.avro;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -29,24 +28,17 @@ import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.ResolvingDecoder;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
import org.apache.iceberg.avro.SupportsRowPosition;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
-import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
public class DataReader<T> implements DatumReader<T>, SupportsRowPosition {
- private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>>
DECODER_CACHES =
- ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
-
public static <D> DataReader<D> create(org.apache.iceberg.Schema
expectedSchema, Schema readSchema) {
return create(expectedSchema, readSchema, ImmutableMap.of());
}
@@ -74,10 +66,7 @@ public class DataReader<T> implements DatumReader<T>,
SupportsRowPosition {
@Override
public T read(T reuse, Decoder decoder) throws IOException {
- ResolvingDecoder resolver = resolve(decoder);
- T value = reader.read(resolver, reuse);
- resolver.drain();
- return value;
+ return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema,
reader, reuse);
}
@Override
@@ -87,30 +76,6 @@ public class DataReader<T> implements DatumReader<T>,
SupportsRowPosition {
}
}
- private ResolvingDecoder resolve(Decoder decoder) throws IOException {
- Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
- Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
- .computeIfAbsent(readSchema, k -> new HashMap<>());
-
- ResolvingDecoder resolver = fileSchemaToResolver.get(fileSchema);
- if (resolver == null) {
- resolver = newResolver();
- fileSchemaToResolver.put(fileSchema, resolver);
- }
-
- resolver.configure(decoder);
-
- return resolver;
- }
-
- private ResolvingDecoder newResolver() {
- try {
- return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema,
null);
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
- }
-
protected ValueReader<?> createStructReader(Types.StructType struct,
List<ValueReader<?>> fields,
Map<Integer, ?> idToConstant) {
return GenericReaders.struct(struct, fields, idToConstant);
diff --git
a/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
b/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
new file mode 100644
index 0000000..8c89f3f
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.avro;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.iceberg.avro.ValueReader;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
+
+/**
+ * Resolver to resolve {@link Decoder} to a {@link ResolvingDecoder}. This
class uses a {@link ThreadLocal} for caching
+ * {@link ResolvingDecoder}.
+ */
+public class DecoderResolver {
+
+ private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>>
DECODER_CACHES =
+ ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
+
+ private DecoderResolver() {}
+
+ public static <T> T resolveAndRead(
+ Decoder decoder, Schema readSchema, Schema fileSchema, ValueReader<T>
reader, T reuse) throws IOException {
+ ResolvingDecoder resolver = DecoderResolver.resolve(decoder, readSchema,
fileSchema);
+ T value = reader.read(resolver, reuse);
+ resolver.drain();
+ return value;
+ }
+
+ private static ResolvingDecoder resolve(Decoder decoder, Schema readSchema,
Schema fileSchema) throws IOException {
+ Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
+ Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
+ .computeIfAbsent(readSchema, k -> new HashMap<>());
+
+ ResolvingDecoder resolver = fileSchemaToResolver.computeIfAbsent(
+ fileSchema,
+ schema -> newResolver(readSchema, schema));
+
+ resolver.configure(decoder);
+
+ return resolver;
+ }
+
+ private static ResolvingDecoder newResolver(Schema readSchema, Schema
fileSchema) {
+ try {
+ return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema,
null);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
index baecc25..46c594e 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java
@@ -20,7 +20,6 @@
package org.apache.iceberg.spark.data;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.LogicalType;
@@ -28,14 +27,11 @@ import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.ResolvingDecoder;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
-import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.data.avro.DecoderResolver;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -43,9 +39,6 @@ import org.apache.spark.sql.catalyst.InternalRow;
public class SparkAvroReader implements DatumReader<InternalRow> {
- private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>>
DECODER_CACHES =
- ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
-
private final Schema readSchema;
private final ValueReader<InternalRow> reader;
private Schema fileSchema = null;
@@ -68,34 +61,7 @@ public class SparkAvroReader implements
DatumReader<InternalRow> {
@Override
public InternalRow read(InternalRow reuse, Decoder decoder) throws
IOException {
- ResolvingDecoder resolver = resolve(decoder);
- InternalRow row = reader.read(resolver, reuse);
- resolver.drain();
- return row;
- }
-
- private ResolvingDecoder resolve(Decoder decoder) throws IOException {
- Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
- Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
- .computeIfAbsent(readSchema, k -> new HashMap<>());
-
- ResolvingDecoder resolver = fileSchemaToResolver.get(fileSchema);
- if (resolver == null) {
- resolver = newResolver();
- fileSchemaToResolver.put(fileSchema, resolver);
- }
-
- resolver.configure(decoder);
-
- return resolver;
- }
-
- private ResolvingDecoder newResolver() {
- try {
- return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema,
null);
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
+ return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema,
reader, reuse);
}
private static class ReadBuilder extends
AvroSchemaWithTypeVisitor<ValueReader<?>> {