This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f17879c Avro: Add row position reader (#1222)
f17879c is described below
commit f17879c426c2e3c5fa40f17dfe58633bc866f1c9
Author: Ryan Blue <[email protected]>
AuthorDate: Thu Jul 23 14:18:50 2020 -0700
Avro: Add row position reader (#1222)
---
.../main/java/org/apache/iceberg/avro/AvroIO.java | 68 ++++++++
.../java/org/apache/iceberg/avro/AvroIterable.java | 6 +
.../apache/iceberg/avro/BuildAvroProjection.java | 6 +-
.../apache/iceberg/avro/ProjectionDatumReader.java | 10 +-
.../apache/iceberg/avro/SupportsRowPosition.java | 29 ++++
.../java/org/apache/iceberg/avro/ValueReaders.java | 42 ++++-
.../org/apache/iceberg/data/avro/DataReader.java | 11 +-
.../org/apache/iceberg/avro/TestAvroFileSplit.java | 187 +++++++++++++++++++++
8 files changed, 355 insertions(+), 4 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroIO.java
b/core/src/main/java/org/apache/iceberg/avro/AvroIO.java
index 523ee6d..c569d93 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroIO.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroIO.java
@@ -19,15 +19,29 @@
package org.apache.iceberg.avro;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.avro.InvalidAvroMagicException;
import org.apache.avro.file.SeekableInput;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.DelegatingInputStream;
import org.apache.iceberg.io.SeekableInputStream;
class AvroIO {
+ private static final byte[] AVRO_MAGIC = new byte[] { 'O', 'b', 'j', 1 };
+ private static final ValueReader<byte[]> MAGIC_READER =
ValueReaders.fixed(AVRO_MAGIC.length);
+ private static final ValueReader<Map<String, String>> META_READER =
ValueReaders.map(
+ ValueReaders.strings(), ValueReaders.strings());
+ private static final ValueReader<byte[]> SYNC_READER =
ValueReaders.fixed(16);
+
private AvroIO() {
}
@@ -131,4 +145,58 @@ class AvroIO {
return stream.markSupported();
}
}
+
+ static long findStartingRowPos(Supplier<SeekableInputStream> open, long
start) {
+ long totalRows = 0;
+ try (SeekableInputStream in = open.get()) {
+ // use a direct decoder that will not buffer so the position of the
input stream is accurate
+ BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in,
null);
+
+ // an Avro file's layout looks like this:
+ // header|block|block|...
+ // the header contains:
+ // magic|string-map|sync
+ // each block consists of:
+ // row-count|compressed-size-in-bytes|block-bytes|sync
+
+ // it is necessary to read the header here because this is the only way
to get the expected file sync bytes
+ byte[] magic = MAGIC_READER.read(decoder, null);
+ if (!Arrays.equals(AVRO_MAGIC, magic)) {
+ throw new InvalidAvroMagicException("Not an Avro file");
+ }
+
+ META_READER.read(decoder, null); // ignore the file metadata, it isn't
needed
+ byte[] fileSync = SYNC_READER.read(decoder, null);
+
+ // the while loop reads row counts and seeks past the block bytes until
the next sync pos is >= start, which
+ // indicates that the next sync is the start of the split.
+ byte[] blockSync = new byte[16];
+ long nextSyncPos = in.getPos();
+
+ while (nextSyncPos < start) {
+ if (nextSyncPos != in.getPos()) {
+ in.seek(nextSyncPos);
+ SYNC_READER.read(decoder, blockSync);
+
+ if (!Arrays.equals(fileSync, blockSync)) {
+ throw new RuntimeIOException("Invalid sync at %s", nextSyncPos);
+ }
+ }
+
+ long rowCount = decoder.readLong();
+ long compressedBlockSize = decoder.readLong();
+
+ totalRows += rowCount;
+ nextSyncPos = in.getPos() + compressedBlockSize;
+ }
+
+ return totalRows;
+
+ } catch (EOFException e) {
+ return totalRows;
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to read stream while finding
starting row position");
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
index b69a950..fd63379 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
@@ -77,7 +77,13 @@ public class AvroIterable<D> extends CloseableGroup
implements CloseableIterable
FileReader<D> fileReader = initMetadata(newFileReader());
if (start != null) {
+ if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(
+ () -> AvroIO.findStartingRowPos(file::newStream, start));
+ }
fileReader = new AvroRangeIterator<>(fileReader, start, end);
+ } else if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(() -> 0L);
}
addCloseable(fileReader);
diff --git
a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
index a1e0d05..df9e972 100644
--- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
+++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.function.Supplier;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -48,6 +49,7 @@ class BuildAvroProjection extends
AvroCustomOrderSchemaVisitor<Schema, Schema.Fi
}
@Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
public Schema record(Schema record, List<String> names,
Iterable<Schema.Field> schemaIterable) {
Preconditions.checkArgument(
current.isNestedType() && current.asNestedType().isStructType(),
@@ -93,7 +95,9 @@ class BuildAvroProjection extends
AvroCustomOrderSchemaVisitor<Schema, Schema.Fi
updatedFields.add(avroField);
} else {
- Preconditions.checkArgument(field.isOptional(), "Missing required
field: %s", field.name());
+ Preconditions.checkArgument(
+ field.isOptional() || field.fieldId() ==
MetadataColumns.ROW_POSITION.fieldId(),
+ "Missing required field: %s", field.name());
// Create a field that will be defaulted to null. We assign a unique
suffix to the field
// to make sure that even if records in the file have the field it is
not projected.
Schema.Field newField = new Schema.Field(
diff --git
a/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java
b/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java
index 5f627bb..1ee77cb 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ProjectionDatumReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
@@ -30,7 +31,7 @@ import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.types.TypeUtil;
-public class ProjectionDatumReader<D> implements DatumReader<D> {
+public class ProjectionDatumReader<D> implements DatumReader<D>,
SupportsRowPosition {
private final Function<Schema, DatumReader<?>> getReader;
private final org.apache.iceberg.Schema expectedSchema;
private final Map<String, String> renames;
@@ -50,6 +51,13 @@ public class ProjectionDatumReader<D> implements
DatumReader<D> {
}
@Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ if (wrapped instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) wrapped).setRowPositionSupplier(posSupplier);
+ }
+ }
+
+ @Override
public void setSchema(Schema newFileSchema) {
this.fileSchema = newFileSchema;
if (nameMapping == null && !AvroSchemaUtil.hasIds(fileSchema)) {
diff --git
a/core/src/main/java/org/apache/iceberg/avro/SupportsRowPosition.java
b/core/src/main/java/org/apache/iceberg/avro/SupportsRowPosition.java
new file mode 100644
index 0000000..1113cd4
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/SupportsRowPosition.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.util.function.Supplier;
+
+/**
+ * Interface for readers that accept a callback to determine the starting row
position of an Avro split.
+ */
+public interface SupportsRowPosition {
+ void setRowPositionSupplier(Supplier<Long> posSupplier);
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 81c5949..a51580f 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -31,12 +31,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.Utf8;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -560,10 +562,11 @@ public class ValueReaders {
}
}
- public abstract static class StructReader<S> implements ValueReader<S> {
+ public abstract static class StructReader<S> implements ValueReader<S>,
SupportsRowPosition {
private final ValueReader<?>[] readers;
private final int[] positions;
private final Object[] constants;
+ private int posField = -1;
protected StructReader(List<ValueReader<?>> readers) {
this.readers = readers.toArray(new ValueReader[0]);
@@ -582,6 +585,9 @@ public class ValueReaders {
if (idToConstant.containsKey(field.fieldId())) {
positionList.add(pos);
constantList.add(idToConstant.get(field.fieldId()));
+ } else if (field.fieldId() == MetadataColumns.ROW_POSITION.fieldId()) {
+ // track where the _pos field is located for setRowPositionSupplier
+ this.posField = pos;
}
}
@@ -589,6 +595,26 @@ public class ValueReaders {
this.constants = constantList.toArray();
}
+ @Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ if (posField > 0) {
+ long startingPos = posSupplier.get();
+ this.readers[posField] = new PositionReader(startingPos);
+ for (ValueReader<?> reader : readers) {
+ if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(() ->
startingPos);
+ }
+ }
+
+ } else {
+ for (ValueReader<?> reader : readers) {
+ if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+ }
+ }
+ }
+ }
+
protected abstract S reuseOrCreate(Object reuse);
protected abstract Object get(S struct, int pos);
@@ -687,4 +713,18 @@ public class ValueReaders {
struct.put(pos, value);
}
}
+
+ static class PositionReader implements ValueReader<Long> {
+ private long currentPosition;
+
+ PositionReader(long rowPosition) {
+ this.currentPosition = rowPosition - 1;
+ }
+
+ @Override
+ public Long read(Decoder ignored, Object reuse) throws IOException {
+ currentPosition += 1;
+ return currentPosition;
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
index c13bd25..613b4b8 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
@@ -32,6 +33,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
+import org.apache.iceberg.avro.SupportsRowPosition;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.exceptions.RuntimeIOException;
@@ -40,7 +42,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-public class DataReader<T> implements DatumReader<T> {
+public class DataReader<T> implements DatumReader<T>, SupportsRowPosition {
private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>>
DECODER_CACHES =
ThreadLocal.withInitial(() -> new MapMaker().weakKeys().makeMap());
@@ -78,6 +80,13 @@ public class DataReader<T> implements DatumReader<T> {
return value;
}
+ @Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+ }
+ }
+
private ResolvingDecoder resolve(Decoder decoder) throws IOException {
Map<Schema, Map<Schema, ResolvingDecoder>> cache = DECODER_CACHES.get();
Map<Schema, ResolvingDecoder> fileSchemaToResolver = cache
diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java
b/core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java
new file mode 100644
index 0000000..135fe9f
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroFileSplit {
+ private static final Schema SCHEMA = new Schema(
+ NestedField.required(1, "id", Types.LongType.get()),
+ NestedField.required(2, "data", Types.StringType.get()));
+
+ private static final int NUM_RECORDS = 100_000;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ public List<Record> expected = null;
+ public InputFile file = null;
+
+ @Before
+ public void writeDataFile() throws IOException {
+ this.expected = Lists.newArrayList();
+
+ OutputFile out = Files.localOutput(temp.newFile());
+
+ try (FileAppender<Object> writer = Avro.write(out)
+ .set(TableProperties.AVRO_COMPRESSION, "uncompressed")
+ .createWriterFunc(DataWriter::create)
+ .schema(SCHEMA)
+ .overwrite()
+ .build()) {
+
+ Record record = GenericRecord.create(SCHEMA);
+ for (long i = 0; i < NUM_RECORDS; i += 1) {
+ Record next = record.copy(ImmutableMap.of(
+ "id", i,
+ "data", UUID.randomUUID().toString()));
+ expected.add(next);
+ writer.add(next);
+ }
+ }
+
+ this.file = out.toInputFile();
+ }
+
+ @Test
+ public void testSplitDataSkipping() throws IOException {
+ long end = file.getLength();
+ long splitLocation = end / 2;
+
+ List<Record> firstHalf = readAvro(file, SCHEMA, 0, splitLocation);
+ Assert.assertNotEquals("First split should not be empty", 0,
firstHalf.size());
+
+ List<Record> secondHalf = readAvro(file, SCHEMA, splitLocation + 1, end -
splitLocation - 1);
+ Assert.assertNotEquals("Second split should not be empty", 0,
secondHalf.size());
+
+ Assert.assertEquals("Total records should match expected",
+ expected.size(), firstHalf.size() + secondHalf.size());
+
+ for (int i = 0; i < firstHalf.size(); i += 1) {
+ Assert.assertEquals(expected.get(i), firstHalf.get(i));
+ }
+
+ for (int i = 0; i < secondHalf.size(); i += 1) {
+ Assert.assertEquals(expected.get(firstHalf.size() + i),
secondHalf.get(i));
+ }
+ }
+
+ @Test
+ public void testPosField() throws IOException {
+ Schema projection = new Schema(
+ SCHEMA.columns().get(0),
+ MetadataColumns.ROW_POSITION,
+ SCHEMA.columns().get(1));
+
+ List<Record> records = readAvro(file, projection, 0, file.getLength());
+
+ for (int i = 0; i < expected.size(); i += 1) {
+ Assert.assertEquals("Field _pos should match",
+ (long) i,
records.get(i).getField(MetadataColumns.ROW_POSITION.name()));
+ Assert.assertEquals("Field id should match",
+ expected.get(i).getField("id"), records.get(i).getField("id"));
+ Assert.assertEquals("Field data should match",
+ expected.get(i).getField("data"), records.get(i).getField("data"));
+ }
+ }
+
+ @Test
+ public void testPosFieldWithSplits() throws IOException {
+ Schema projection = new Schema(
+ SCHEMA.columns().get(0),
+ MetadataColumns.ROW_POSITION,
+ SCHEMA.columns().get(1));
+
+ long end = file.getLength();
+ long splitLocation = end / 2;
+
+ List<Record> secondHalf = readAvro(file, projection, splitLocation + 1,
end - splitLocation - 1);
+ Assert.assertNotEquals("Second split should not be empty", 0,
secondHalf.size());
+
+ List<Record> firstHalf = readAvro(file, projection, 0, splitLocation);
+ Assert.assertNotEquals("First split should not be empty", 0,
firstHalf.size());
+
+ Assert.assertEquals("Total records should match expected",
+ expected.size(), firstHalf.size() + secondHalf.size());
+
+ for (int i = 0; i < firstHalf.size(); i += 1) {
+ Assert.assertEquals("Field _pos should match",
+ (long) i,
firstHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()));
+ Assert.assertEquals("Field id should match",
+ expected.get(i).getField("id"), firstHalf.get(i).getField("id"));
+ Assert.assertEquals("Field data should match",
+ expected.get(i).getField("data"), firstHalf.get(i).getField("data"));
+ }
+
+ for (int i = 0; i < secondHalf.size(); i += 1) {
+ Assert.assertEquals("Field _pos should match",
+ (long) (firstHalf.size() + i),
secondHalf.get(i).getField(MetadataColumns.ROW_POSITION.name()));
+ Assert.assertEquals("Field id should match",
+ expected.get(firstHalf.size() + i).getField("id"),
secondHalf.get(i).getField("id"));
+ Assert.assertEquals("Field data should match",
+ expected.get(firstHalf.size() + i).getField("data"),
secondHalf.get(i).getField("data"));
+ }
+ }
+
+ @Test
+ public void testPosWithEOFSplit() throws IOException {
+ Schema projection = new Schema(
+ SCHEMA.columns().get(0),
+ MetadataColumns.ROW_POSITION,
+ SCHEMA.columns().get(1));
+
+ long end = file.getLength();
+
+ List<Record> records = readAvro(file, projection, end - 10, 10);
+ Assert.assertEquals("Should not read any records", 0, records.size());
+ }
+
+ public List<Record> readAvro(InputFile in, Schema projection, long start,
long length) throws IOException {
+ try (AvroIterable<Record> reader = Avro.read(in)
+ .createReaderFunc(DataReader::create)
+ .split(start, length)
+ .project(projection)
+ .build()) {
+ return Lists.newArrayList(reader);
+ }
+ }
+}