Repository: beam Updated Branches: refs/heads/master 7e63d2cf6 -> 1d9160fa3
Gets rid of opening Avro files in createForSubrangeOfFile codepath Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4026da1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4026da1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4026da1 Branch: refs/heads/master Commit: d4026da1ad1fa0864052b85a66c4af5975327e9f Parents: c52a908 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Tue Jul 18 14:09:03 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Jul 20 16:59:11 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroSource.java | 176 +++++++------------ .../org/apache/beam/sdk/io/AvroSourceTest.java | 11 +- 2 files changed, 63 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d4026da1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index 0634774..30af344 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; @@ -135,45 +136,33 @@ public class AvroSource<T> extends BlockBasedSource<T> { @Nullable private final String readerSchemaString; - // The JSON schema that was used to write the source Avro file (may differ from the schema we will - // use to read from it). - private final String writerSchemaString; - - // The following metadata fields are not user-configurable. They are extracted from the object - // container file header upon subsource creation. - - // The codec used to encode the blocks in the Avro file. String value drawn from those in - // https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html - private final String codec; - - // The object container file's 16-byte sync marker. - private final byte[] syncMarker; - /** * Reads from the given file name or pattern ("glob"). The returned source can be further * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}. */ public static AvroSource<GenericRecord> from(String fileNameOrPattern) { - return new AvroSource<>( - fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class, null, null); + return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class); } /** Reads files containing records that conform to the given schema. */ public AvroSource<GenericRecord> withSchema(String schema) { return new AvroSource<>( - getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class, codec, syncMarker); + getFileOrPatternSpec(), getMinBundleSize(), schema, GenericRecord.class); } /** Like {@link #withSchema(String)}. */ public AvroSource<GenericRecord> withSchema(Schema schema) { - return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(), schema.toString(), - GenericRecord.class, codec, syncMarker); + return new AvroSource<>( + getFileOrPatternSpec(), getMinBundleSize(), schema.toString(), GenericRecord.class); } /** Reads files containing records of the given class. */ public <X> AvroSource<X> withSchema(Class<X> clazz) { - return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(), - ReflectData.get().getSchema(clazz).toString(), clazz, codec, syncMarker); + return new AvroSource<>( + getFileOrPatternSpec(), + getMinBundleSize(), + ReflectData.get().getSchema(clazz).toString(), + clazz); } /** @@ -181,24 +170,15 @@ public class AvroSource<T> extends BlockBasedSource<T> { * minBundleSize} and its use. */ public AvroSource<T> withMinBundleSize(long minBundleSize) { - return new AvroSource<>( - getFileOrPatternSpec(), minBundleSize, readerSchemaString, type, codec, syncMarker); + return new AvroSource<>(getFileOrPatternSpec(), minBundleSize, readerSchemaString, type); } /** Constructor for FILEPATTERN mode. */ private AvroSource( - String fileNameOrPattern, - long minBundleSize, - String readerSchemaString, - Class<T> type, - String codec, - byte[] syncMarker) { + String fileNameOrPattern, long minBundleSize, String readerSchemaString, Class<T> type) { super(fileNameOrPattern, minBundleSize); this.readerSchemaString = internSchemaString(readerSchemaString); - this.codec = codec; - this.syncMarker = syncMarker; this.type = type; - this.writerSchemaString = null; } /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */ @@ -208,16 +188,10 @@ public class AvroSource<T> extends BlockBasedSource<T> { long startOffset, long endOffset, String readerSchemaString, - Class<T> type, - String codec, - byte[] syncMarker, - String writerSchemaString) { + Class<T> type) { super(metadata, minBundleSize, startOffset, endOffset); this.readerSchemaString = internSchemaString(readerSchemaString); - this.codec = codec; - this.syncMarker = syncMarker; this.type = type; - this.writerSchemaString = internSchemaString(writerSchemaString); } @Override @@ -229,39 +203,7 @@ public class AvroSource<T> extends BlockBasedSource<T> { @Override public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) { - byte[] syncMarker = this.syncMarker; - String codec = this.codec; - String writerSchemaString = this.writerSchemaString; - // codec and syncMarker are initially null when the source is created, as they differ - // across input files and must be read from the file. Here, when we are creating a source - // for a subrange of a file, we can initialize these values. When the resulting AvroSource - // is further split, they do not need to be read again. - if (codec == null || syncMarker == null || writerSchemaString == null) { - AvroMetadata metadata; - try { - metadata = readMetadataFromFile(fileMetadata.resourceId()); - } catch (IOException e) { - throw new RuntimeException("Error reading metadata from file " + fileMetadata, e); - } - codec = metadata.getCodec(); - syncMarker = metadata.getSyncMarker(); - writerSchemaString = metadata.getSchemaString(); - } - // Note that if the writerSchemaString is equivalent to the readerSchemaString, "intern"ing - // the string will occur within the constructor and return the same reference as the - // readerSchemaString. This allows for Java to have an efficient serialization since it - // will only encode the schema once while just storing pointers to the encoded version - // within this source. - return new AvroSource<>( - fileMetadata, - getMinBundleSize(), - start, - end, - readerSchemaString, - type, - codec, - syncMarker, - writerSchemaString); + return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readerSchemaString, type); } @Override @@ -280,27 +222,17 @@ public class AvroSource<T> extends BlockBasedSource<T> { return readerSchemaString; } - private byte[] getSyncMarker() { - return syncMarker; - } - - private String getCodec() { - return codec; - } - - /** - * Avro file metadata. - */ + /** Avro file metadata. */ @VisibleForTesting static class AvroMetadata { - private byte[] syncMarker; - private String codec; - private String schemaString; + private final byte[] syncMarker; + private final String codec; + private final String schemaString; AvroMetadata(byte[] syncMarker, String codec, String schemaString) { this.syncMarker = checkNotNull(syncMarker, "syncMarker"); this.codec = checkNotNull(codec, "codec"); - this.schemaString = checkNotNull(schemaString, "schemaString"); + this.schemaString = internSchemaString(checkNotNull(schemaString, "schemaString")); } /** @@ -391,18 +323,6 @@ public class AvroSource<T> extends BlockBasedSource<T> { return new AvroMetadata(syncMarker, codec, schemaString); } - private DatumReader<T> createDatumReader() { - checkNotNull(writerSchemaString, "No writer schema has been initialized for source %s", this); - Schema writerSchema = internOrParseSchemaString(writerSchemaString); - Schema readerSchema = - (readerSchemaString == null) ? writerSchema : internOrParseSchemaString(readerSchemaString); - if (type == GenericRecord.class) { - return new GenericDatumReader<>(writerSchema, readerSchema); - } else { - return new ReflectDatumReader<>(writerSchema, readerSchema); - } - } - // A logical reference cache used to store schemas and schema strings to allow us to // "intern" values and reduce the number of copies of equivalent objects. private static final Map<String, Schema> schemaLogicalReferenceCache = new WeakHashMap<>(); @@ -443,18 +363,10 @@ public class AvroSource<T> extends BlockBasedSource<T> { getStartOffset(), getEndOffset(), readerSchemaString, - type, - codec, - syncMarker, - writerSchemaString); + type); case FILEPATTERN: return new AvroSource<>( - getFileOrPatternSpec(), - getMinBundleSize(), - readerSchemaString, - type, - codec, - syncMarker); + getFileOrPatternSpec(), getMinBundleSize(), readerSchemaString, type); default: throw new InvalidObjectException( String.format("Unknown mode %s for AvroSource %s", getMode(), this)); @@ -518,11 +430,25 @@ public class AvroSource<T> extends BlockBasedSource<T> { } } - AvroBlock(byte[] data, long numRecords, AvroSource<T> source) throws IOException { + AvroBlock( + byte[] data, + long numRecords, + Class<? extends T> type, + String readerSchemaString, + String writerSchemaString, + String codec) + throws IOException { this.numRecords = numRecords; - this.reader = source.createDatumReader(); - this.decoder = - DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, source.getCodec()), null); + checkNotNull(writerSchemaString, "writerSchemaString"); + Schema writerSchema = internOrParseSchemaString(writerSchemaString); + Schema readerSchema = + internOrParseSchemaString( + MoreObjects.firstNonNull(readerSchemaString, writerSchemaString)); + this.reader = + (type == GenericRecord.class) + ? new GenericDatumReader<T>(writerSchema, readerSchema) + : new ReflectDatumReader<T>(writerSchema, readerSchema); + this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null); } @Override @@ -558,6 +484,8 @@ public class AvroSource<T> extends BlockBasedSource<T> { */ @Experimental(Experimental.Kind.SOURCE_SINK) public static class AvroReader<T> extends BlockBasedReader<T> { + private AvroMetadata metadata; + // The current block. private AvroBlock<T> currentBlock; @@ -631,10 +559,17 @@ public class AvroSource<T> extends BlockBasedSource<T> { "Only able to read %s/%s bytes in the block before EOF reached.", bytesRead, blockSize); - currentBlock = new AvroBlock<>(data, numRecords, getCurrentSource()); + currentBlock = + new AvroBlock<>( + data, + numRecords, + getCurrentSource().type, + getCurrentSource().readerSchemaString, + metadata.getSchemaString(), + metadata.getCodec()); // Read the end of this block, which MUST be a sync marker for correctness. - byte[] syncMarker = getCurrentSource().getSyncMarker(); + byte[] syncMarker = metadata.getSyncMarker(); byte[] readSyncMarker = new byte[syncMarker.length]; long syncMarkerOffset = startOfNextBlock + headerSize + blockSize; bytesRead = IOUtils.readFully(stream, readSyncMarker); @@ -705,7 +640,7 @@ public class AvroSource<T> extends BlockBasedSource<T> { private PushbackInputStream createStream(ReadableByteChannel channel) { return new PushbackInputStream( Channels.newInputStream(channel), - getCurrentSource().getSyncMarker().length); + metadata.getSyncMarker().length); } // Postcondition: the stream is positioned at the beginning of the first block after the start @@ -713,8 +648,15 @@ public class AvroSource<T> extends BlockBasedSource<T> { // currentBlockSizeBytes will be set to 0 indicating that the previous block was empty. @Override protected void startReading(ReadableByteChannel channel) throws IOException { + try { + metadata = readMetadataFromFile(getCurrentSource().getSingleFileMetadata().resourceId()); + } catch (IOException e) { + throw new RuntimeException( + "Error reading metadata from file " + getCurrentSource().getSingleFileMetadata(), e); + } + long startOffset = getCurrentSource().getStartOffset(); - byte[] syncMarker = getCurrentSource().getSyncMarker(); + byte[] syncMarker = metadata.getSyncMarker(); long syncMarkerLength = syncMarker.length; if (startOffset != 0) { http://git-wip-us.apache.org/repos/asf/beam/blob/d4026da1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java index 0fc2b3e..bf2ac95 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java @@ -21,7 +21,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -439,12 +438,10 @@ public class AvroSourceTest { String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0, AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC); Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename); - String schemaA = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString(); - String schemaB = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString(); - assertNotSame(schemaA, schemaB); - - AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA); - AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema(schemaB); + String schema = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString(); + // Add "" to the schema to make sure it is not interned. + AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema("" + schema); + AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema("" + schema); assertSame(sourceA.getReaderSchemaString(), sourceB.getReaderSchemaString()); // Ensure that deserialization still goes through interning