Repository: beam
Updated Branches:
  refs/heads/master 7e63d2cf6 -> 1d9160fa3


Gets rid of opening Avro files in createForSubrangeOfFile codepath


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4026da1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4026da1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4026da1

Branch: refs/heads/master
Commit: d4026da1ad1fa0864052b85a66c4af5975327e9f
Parents: c52a908
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Tue Jul 18 14:09:03 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Thu Jul 20 16:59:11 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 176 +++++++------------
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  11 +-
 2 files changed, 63 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d4026da1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 0634774..30af344 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -21,6 +21,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -135,45 +136,33 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   @Nullable
   private final String readerSchemaString;
 
-  // The JSON schema that was used to write the source Avro file (may differ 
from the schema we will
-  // use to read from it).
-  private final String writerSchemaString;
-
-  // The following metadata fields are not user-configurable. They are 
extracted from the object
-  // container file header upon subsource creation.
-
-  // The codec used to encode the blocks in the Avro file. String value drawn 
from those in
-  // 
https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
-  private final String codec;
-
-  // The object container file's 16-byte sync marker.
-  private final byte[] syncMarker;
-
   /**
    * Reads from the given file name or pattern ("glob"). The returned source 
can be further
    * configured by calling {@link #withSchema} to return a type other than 
{@link GenericRecord}.
    */
   public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
-    return new AvroSource<>(
-        fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, GenericRecord.class, 
null, null);
+    return new AvroSource<>(fileNameOrPattern, DEFAULT_MIN_BUNDLE_SIZE, null, 
GenericRecord.class);
   }
 
   /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
     return new AvroSource<>(
-        getFileOrPatternSpec(), getMinBundleSize(), schema, 
GenericRecord.class, codec, syncMarker);
+        getFileOrPatternSpec(), getMinBundleSize(), schema, 
GenericRecord.class);
   }
 
   /** Like {@link #withSchema(String)}. */
   public AvroSource<GenericRecord> withSchema(Schema schema) {
-    return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(), 
schema.toString(),
-        GenericRecord.class, codec, syncMarker);
+    return new AvroSource<>(
+        getFileOrPatternSpec(), getMinBundleSize(), schema.toString(), 
GenericRecord.class);
   }
 
   /** Reads files containing records of the given class. */
   public <X> AvroSource<X> withSchema(Class<X> clazz) {
-    return new AvroSource<>(getFileOrPatternSpec(), getMinBundleSize(),
-        ReflectData.get().getSchema(clazz).toString(), clazz, codec, 
syncMarker);
+    return new AvroSource<>(
+        getFileOrPatternSpec(),
+        getMinBundleSize(),
+        ReflectData.get().getSchema(clazz).toString(),
+        clazz);
   }
 
   /**
@@ -181,24 +170,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
-    return new AvroSource<>(
-        getFileOrPatternSpec(), minBundleSize, readerSchemaString, type, 
codec, syncMarker);
+    return new AvroSource<>(getFileOrPatternSpec(), minBundleSize, 
readerSchemaString, type);
   }
 
   /** Constructor for FILEPATTERN mode. */
   private AvroSource(
-      String fileNameOrPattern,
-      long minBundleSize,
-      String readerSchemaString,
-      Class<T> type,
-      String codec,
-      byte[] syncMarker) {
+      String fileNameOrPattern, long minBundleSize, String readerSchemaString, 
Class<T> type) {
     super(fileNameOrPattern, minBundleSize);
     this.readerSchemaString = internSchemaString(readerSchemaString);
-    this.codec = codec;
-    this.syncMarker = syncMarker;
     this.type = type;
-    this.writerSchemaString = null;
   }
 
   /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */
@@ -208,16 +188,10 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       long startOffset,
       long endOffset,
       String readerSchemaString,
-      Class<T> type,
-      String codec,
-      byte[] syncMarker,
-      String writerSchemaString) {
+      Class<T> type) {
     super(metadata, minBundleSize, startOffset, endOffset);
     this.readerSchemaString = internSchemaString(readerSchemaString);
-    this.codec = codec;
-    this.syncMarker = syncMarker;
     this.type = type;
-    this.writerSchemaString = internSchemaString(writerSchemaString);
   }
 
   @Override
@@ -229,39 +203,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
 
   @Override
   public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, 
long start, long end) {
-    byte[] syncMarker = this.syncMarker;
-    String codec = this.codec;
-    String writerSchemaString = this.writerSchemaString;
-    // codec and syncMarker are initially null when the source is created, as 
they differ
-    // across input files and must be read from the file. Here, when we are 
creating a source
-    // for a subrange of a file, we can initialize these values. When the 
resulting AvroSource
-    // is further split, they do not need to be read again.
-    if (codec == null || syncMarker == null || writerSchemaString == null) {
-      AvroMetadata metadata;
-      try {
-        metadata = readMetadataFromFile(fileMetadata.resourceId());
-      } catch (IOException e) {
-        throw new RuntimeException("Error reading metadata from file " + 
fileMetadata, e);
-      }
-      codec = metadata.getCodec();
-      syncMarker = metadata.getSyncMarker();
-      writerSchemaString = metadata.getSchemaString();
-    }
-    // Note that if the writerSchemaString is equivalent to the 
readerSchemaString, "intern"ing
-    // the string will occur within the constructor and return the same 
reference as the
-    // readerSchemaString. This allows for Java to have an efficient 
serialization since it
-    // will only encode the schema once while just storing pointers to the 
encoded version
-    // within this source.
-    return new AvroSource<>(
-        fileMetadata,
-        getMinBundleSize(),
-        start,
-        end,
-        readerSchemaString,
-        type,
-        codec,
-        syncMarker,
-        writerSchemaString);
+    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, 
readerSchemaString, type);
   }
 
   @Override
@@ -280,27 +222,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     return readerSchemaString;
   }
 
-  private byte[] getSyncMarker() {
-    return syncMarker;
-  }
-
-  private String getCodec() {
-    return codec;
-  }
-
-  /**
-   * Avro file metadata.
-   */
+  /** Avro file metadata. */
   @VisibleForTesting
   static class AvroMetadata {
-    private byte[] syncMarker;
-    private String codec;
-    private String schemaString;
+    private final byte[] syncMarker;
+    private final String codec;
+    private final String schemaString;
 
     AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
       this.syncMarker = checkNotNull(syncMarker, "syncMarker");
       this.codec = checkNotNull(codec, "codec");
-      this.schemaString = checkNotNull(schemaString, "schemaString");
+      this.schemaString = internSchemaString(checkNotNull(schemaString, 
"schemaString"));
     }
 
     /**
@@ -391,18 +323,6 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     return new AvroMetadata(syncMarker, codec, schemaString);
   }
 
-  private DatumReader<T> createDatumReader() {
-    checkNotNull(writerSchemaString, "No writer schema has been initialized 
for source %s", this);
-    Schema writerSchema = internOrParseSchemaString(writerSchemaString);
-    Schema readerSchema =
-        (readerSchemaString == null) ? writerSchema : 
internOrParseSchemaString(readerSchemaString);
-    if (type == GenericRecord.class) {
-      return new GenericDatumReader<>(writerSchema, readerSchema);
-    } else {
-      return new ReflectDatumReader<>(writerSchema, readerSchema);
-    }
-  }
-
   // A logical reference cache used to store schemas and schema strings to 
allow us to
   // "intern" values and reduce the number of copies of equivalent objects.
   private static final Map<String, Schema> schemaLogicalReferenceCache = new 
WeakHashMap<>();
@@ -443,18 +363,10 @@ public class AvroSource<T> extends BlockBasedSource<T> {
             getStartOffset(),
             getEndOffset(),
             readerSchemaString,
-            type,
-            codec,
-            syncMarker,
-            writerSchemaString);
+            type);
       case FILEPATTERN:
         return new AvroSource<>(
-            getFileOrPatternSpec(),
-            getMinBundleSize(),
-            readerSchemaString,
-            type,
-            codec,
-            syncMarker);
+            getFileOrPatternSpec(), getMinBundleSize(), readerSchemaString, 
type);
         default:
           throw new InvalidObjectException(
               String.format("Unknown mode %s for AvroSource %s", getMode(), 
this));
@@ -518,11 +430,25 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       }
     }
 
-    AvroBlock(byte[] data, long numRecords, AvroSource<T> source) throws 
IOException {
+    AvroBlock(
+        byte[] data,
+        long numRecords,
+        Class<? extends T> type,
+        String readerSchemaString,
+        String writerSchemaString,
+        String codec)
+        throws IOException {
       this.numRecords = numRecords;
-      this.reader = source.createDatumReader();
-      this.decoder =
-          DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, 
source.getCodec()), null);
+      checkNotNull(writerSchemaString, "writerSchemaString");
+      Schema writerSchema = internOrParseSchemaString(writerSchemaString);
+      Schema readerSchema =
+          internOrParseSchemaString(
+              MoreObjects.firstNonNull(readerSchemaString, 
writerSchemaString));
+      this.reader =
+          (type == GenericRecord.class)
+              ? new GenericDatumReader<T>(writerSchema, readerSchema)
+              : new ReflectDatumReader<T>(writerSchema, readerSchema);
+      this.decoder = 
DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);
     }
 
     @Override
@@ -558,6 +484,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    */
   @Experimental(Experimental.Kind.SOURCE_SINK)
   public static class AvroReader<T> extends BlockBasedReader<T> {
+    private AvroMetadata metadata;
+
     // The current block.
     private AvroBlock<T> currentBlock;
 
@@ -631,10 +559,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
           "Only able to read %s/%s bytes in the block before EOF reached.",
           bytesRead,
           blockSize);
-      currentBlock = new AvroBlock<>(data, numRecords, getCurrentSource());
+      currentBlock =
+          new AvroBlock<>(
+              data,
+              numRecords,
+              getCurrentSource().type,
+              getCurrentSource().readerSchemaString,
+              metadata.getSchemaString(),
+              metadata.getCodec());
 
       // Read the end of this block, which MUST be a sync marker for 
correctness.
-      byte[] syncMarker = getCurrentSource().getSyncMarker();
+      byte[] syncMarker = metadata.getSyncMarker();
       byte[] readSyncMarker = new byte[syncMarker.length];
       long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;
       bytesRead = IOUtils.readFully(stream, readSyncMarker);
@@ -705,7 +640,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     private PushbackInputStream createStream(ReadableByteChannel channel) {
       return new PushbackInputStream(
           Channels.newInputStream(channel),
-          getCurrentSource().getSyncMarker().length);
+          metadata.getSyncMarker().length);
     }
 
     // Postcondition: the stream is positioned at the beginning of the first 
block after the start
@@ -713,8 +648,15 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     // currentBlockSizeBytes will be set to 0 indicating that the previous 
block was empty.
     @Override
     protected void startReading(ReadableByteChannel channel) throws 
IOException {
+      try {
+        metadata = 
readMetadataFromFile(getCurrentSource().getSingleFileMetadata().resourceId());
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "Error reading metadata from file " + 
getCurrentSource().getSingleFileMetadata(), e);
+      }
+
       long startOffset = getCurrentSource().getStartOffset();
-      byte[] syncMarker = getCurrentSource().getSyncMarker();
+      byte[] syncMarker = metadata.getSyncMarker();
       long syncMarkerLength = syncMarker.length;
 
       if (startOffset != 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/d4026da1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index 0fc2b3e..bf2ac95 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -21,7 +21,6 @@ import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -439,12 +438,10 @@ public class AvroSourceTest {
     String filename = generateTestFile("tmp.avro", birds, 
SyncBehavior.SYNC_DEFAULT, 0,
         AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
     Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
-    String schemaA = 
AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
-    String schemaB = 
AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
-    assertNotSame(schemaA, schemaB);
-
-    AvroSource<GenericRecord> sourceA = 
AvroSource.from(filename).withSchema(schemaA);
-    AvroSource<GenericRecord> sourceB = 
AvroSource.from(filename).withSchema(schemaB);
+    String schema = 
AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+    // Add "" to the schema to make sure it is not interned.
+    AvroSource<GenericRecord> sourceA = 
AvroSource.from(filename).withSchema("" + schema);
+    AvroSource<GenericRecord> sourceB = 
AvroSource.from(filename).withSchema("" + schema);
     assertSame(sourceA.getReaderSchemaString(), 
sourceB.getReaderSchemaString());
 
     // Ensure that deserialization still goes through interning

Reply via email to