Uses FileIO.read() in TextIO and AvroIO

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d4a7851
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d4a7851
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d4a7851

Branch: refs/heads/master
Commit: 6d4a78517708db3bd89cfeff5a7e62fb6b948e1d
Parents: 910d02f
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Thu Aug 31 16:28:07 2017 -0700
Committer: Eugene Kirpichov <ekirpic...@gmail.com>
Committed: Sun Sep 3 16:32:25 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 14 ++--
 .../java/org/apache/beam/sdk/io/FileIO.java     | 82 ++++++++++----------
 .../beam/sdk/io/ReadAllViaFileBasedSource.java  | 59 +++++++-------
 .../apache/beam/sdk/io/ReadableFileCoder.java   | 25 ++----
 .../java/org/apache/beam/sdk/io/TextIO.java     | 34 +++-----
 .../java/org/apache/beam/sdk/io/FileIOTest.java | 12 +--
 6 files changed, 96 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index c4711e8..108054f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -597,13 +598,13 @@ public class AvroIO {
       checkNotNull(getSchema(), "schema");
       return input
           .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          
.apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
           .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
-                  SerializableFunctions.<String, Boolean>constant(true) /* 
isSplittable */,
                   getDesiredBundleSizeBytes(),
-                  new CreateSourceFn<>(getRecordClass(), 
getSchema().toString())))
-          .setCoder(AvroCoder.of(getRecordClass(), getSchema()));
+                  new CreateSourceFn<>(getRecordClass(), 
getSchema().toString()),
+                  AvroCoder.of(getRecordClass(), getSchema())));
     }
 
     @Override
@@ -804,13 +805,10 @@ public class AvroIO {
               new CreateParseSourceFn<>(parseFn, coder);
       return input
           .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          
.apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
           .apply(
               "Parse all via FileBasedSource",
-              new ReadAllViaFileBasedSource<>(
-                  SerializableFunctions.<String, Boolean>constant(true) /* 
isSplittable */,
-                  getDesiredBundleSizeBytes(),
-                  createSource))
-          .setCoder(coder);
+              new ReadAllViaFileBasedSource<>(getDesiredBundleSizeBytes(), 
createSource, coder));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index fcae0f7..c909c3c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -104,39 +103,18 @@ public class FileIO {
   }
 
   /** A utility class for accessing a potentially compressed file. */
-  public final static class ReadableFile {
-    private final ResourceId resourceId;
-    private final long sizeBytes;
-    private final boolean isSeekable;
+  public static final class ReadableFile {
+    private final MatchResult.Metadata metadata;
     private final Compression compression;
 
-    ReadableFile(
-        ResourceId resourceId,
-        long sizeBytes,
-        boolean isSeekable,
-        Compression compression) {
-      this.resourceId = resourceId;
-      this.sizeBytes = sizeBytes;
-      this.isSeekable = isSeekable;
+    ReadableFile(MatchResult.Metadata metadata, Compression compression) {
+      this.metadata = metadata;
       this.compression = compression;
     }
 
-    /** Returns the {@link ResourceId} of the file. */
-    public ResourceId getResourceId() {
-      return resourceId;
-    }
-
-    /** Returns the size of the file in bytes (before decompression). */
-    public long getSizeBytes() {
-      return sizeBytes;
-    }
-
-    /**
-     * Returns whether or not the channel returned by {@link #open} can be 
efficiently seeked.
-     * If true, then {@link #open} will return a {@link SeekableByteChannel}.
-     */
-    public boolean isSeekable() {
-      return isSeekable;
+    /** Returns the {@link MatchResult.Metadata} of the file. */
+    public MatchResult.Metadata getMetadata() {
+      return metadata;
     }
 
     /** Returns the method with which this file will be decompressed in {@link 
#open}. */
@@ -149,15 +127,18 @@ public class FileIO {
      * decompressing it using {@link #getCompression}.
      */
     public ReadableByteChannel open() throws IOException {
-      return compression.readDecompressed(FileSystems.open(resourceId));
+      return 
compression.readDecompressed(FileSystems.open(metadata.resourceId()));
     }
 
     /**
      * Returns a {@link SeekableByteChannel} equivalent to {@link #open}, but 
fails if this file is
-     * not {@link #isSeekable() seekable}.
+     * not {@link MatchResult.Metadata#isReadSeekEfficient seekable}.
      */
     public SeekableByteChannel openSeekable() throws IOException {
-      checkState(isSeekable(), "The file %s is not seekable", resourceId);
+      checkState(
+          getMetadata().isReadSeekEfficient(),
+          "The file %s is not seekable",
+          metadata.resourceId());
       return ((SeekableByteChannel) open());
     }
 
@@ -173,8 +154,8 @@ public class FileIO {
   }
 
   /**
-   * Describes configuration for matching filepatterns, such as {@link 
EmptyMatchTreatment}
-   * and continuous watching for matching files.
+   * Describes configuration for matching filepatterns, such as {@link 
EmptyMatchTreatment} and
+   * continuous watching for matching files.
    */
   @AutoValue
   public abstract static class MatchConfiguration implements HasDisplayData, 
Serializable {
@@ -186,16 +167,23 @@ public class FileIO {
     }
 
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
-    @Nullable abstract Duration getWatchInterval();
-    @Nullable abstract TerminationCondition<String, ?> 
getWatchTerminationCondition();
+
+    @Nullable
+    abstract Duration getWatchInterval();
+
+    @Nullable
+    abstract TerminationCondition<String, ?> getWatchTerminationCondition();
 
     abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
       abstract Builder setWatchInterval(Duration watchInterval);
+
       abstract Builder 
setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
       abstract MatchConfiguration build();
     }
 
@@ -228,14 +216,19 @@ public class FileIO {
   /** Implementation of {@link #match}. */
   @AutoValue
   public abstract static class Match extends PTransform<PBegin, 
PCollection<MatchResult.Metadata>> {
-    @Nullable abstract ValueProvider<String> getFilepattern();
+    @Nullable
+    abstract ValueProvider<String> getFilepattern();
+
     abstract MatchConfiguration getConfiguration();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
       abstract Builder setConfiguration(MatchConfiguration configuration);
+
       abstract Match build();
     }
 
@@ -283,11 +276,13 @@ public class FileIO {
   public abstract static class MatchAll
       extends PTransform<PCollection<String>, 
PCollection<MatchResult.Metadata>> {
     abstract MatchConfiguration getConfiguration();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setConfiguration(MatchConfiguration configuration);
+
       abstract MatchAll build();
     }
 
@@ -363,6 +358,7 @@ public class FileIO {
     }
 
     abstract Compression getCompression();
+
     abstract DirectoryTreatment getDirectoryTreatment();
 
     abstract Builder toBuilder();
@@ -370,6 +366,7 @@ public class FileIO {
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setCompression(Compression compression);
+
       abstract Builder setDirectoryTreatment(DirectoryTreatment 
directoryTreatment);
 
       abstract ReadMatches build();
@@ -412,7 +409,7 @@ public class FileIO {
       public void process(ProcessContext c) {
         MatchResult.Metadata metadata = c.element();
         if (metadata.resourceId().isDirectory()) {
-          switch(spec.getDirectoryTreatment()) {
+          switch (spec.getDirectoryTreatment()) {
             case SKIP:
               return;
 
@@ -432,9 +429,12 @@ public class FileIO {
                 : spec.getCompression();
         c.output(
             new ReadableFile(
-                metadata.resourceId(),
-                metadata.sizeBytes(),
-                metadata.isReadSeekEfficient() && compression == 
Compression.UNCOMPRESSED,
+                MatchResult.Metadata.builder()
+                    .setResourceId(metadata.resourceId())
+                    .setSizeBytes(metadata.sizeBytes())
+                    .setIsReadSeekEfficient(
+                        metadata.isReadSeekEfficient() && compression == 
Compression.UNCOMPRESSED)
+                    .build(),
                 compression));
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
index 0cd7105..03b9b55 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -17,10 +17,10 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import java.io.IOException;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.range.OffsetRange;
@@ -38,31 +38,30 @@ import org.apache.beam.sdk.values.PCollection;
  * splitting files into offset ranges and for creating a {@link 
FileBasedSource} for a file. The
  * input {@link PCollection} must not contain {@link ResourceId#isDirectory 
directories}.
  *
- * <p>To obtain the collection of {@link Metadata} from a filepattern, use 
{@link
- * FileIO#match} or {@link FileIO#matchAll}.
+ * <p>To obtain the collection of {@link Metadata} from a filepattern, use 
{@link FileIO#match} or
+ * {@link FileIO#matchAll}.
  */
-class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<Metadata>, 
PCollection<T>> {
-  private final SerializableFunction<String, Boolean> isSplittable;
+class ReadAllViaFileBasedSource<T> extends 
PTransform<PCollection<ReadableFile>, PCollection<T>> {
   private final long desiredBundleSizeBytes;
   private final SerializableFunction<String, FileBasedSource<T>> createSource;
+  private final Coder<T> coder;
 
   public ReadAllViaFileBasedSource(
-      SerializableFunction<String, Boolean> isSplittable,
       long desiredBundleSizeBytes,
-      SerializableFunction<String, FileBasedSource<T>> createSource) {
-    this.isSplittable = isSplittable;
+      SerializableFunction<String, FileBasedSource<T>> createSource,
+      Coder<T> coder) {
     this.desiredBundleSizeBytes = desiredBundleSizeBytes;
     this.createSource = createSource;
+    this.coder = coder;
   }
 
   @Override
-  public PCollection<T> expand(PCollection<Metadata> input) {
+  public PCollection<T> expand(PCollection<ReadableFile> input) {
     return input
-        .apply(
-            "Split into ranges",
-            ParDo.of(new SplitIntoRangesFn(isSplittable, 
desiredBundleSizeBytes)))
-        .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, 
OffsetRange>>())
-        .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource)));
+        .apply("Split into ranges", ParDo.of(new 
SplitIntoRangesFn(desiredBundleSizeBytes)))
+        .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<ReadableFile, 
OffsetRange>>())
+        .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource)))
+        .setCoder(coder);
   }
 
   private static class ReshuffleWithUniqueKey<T>
@@ -90,36 +89,28 @@ class ReadAllViaFileBasedSource<T> extends 
PTransform<PCollection<Metadata>, PCo
     }
   }
 
-  private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, 
OffsetRange>> {
-    private final SerializableFunction<String, Boolean> isSplittable;
+  private static class SplitIntoRangesFn extends DoFn<ReadableFile, 
KV<ReadableFile, OffsetRange>> {
     private final long desiredBundleSizeBytes;
 
-    private SplitIntoRangesFn(
-        SerializableFunction<String, Boolean> isSplittable, long 
desiredBundleSizeBytes) {
-      this.isSplittable = isSplittable;
+    private SplitIntoRangesFn(long desiredBundleSizeBytes) {
       this.desiredBundleSizeBytes = desiredBundleSizeBytes;
     }
 
     @ProcessElement
     public void process(ProcessContext c) {
-      Metadata metadata = c.element();
-      checkArgument(
-          !metadata.resourceId().isDirectory(),
-          "Resource %s is a directory",
-          metadata.resourceId());
-      if (!metadata.isReadSeekEfficient()
-          || !isSplittable.apply(metadata.resourceId().toString())) {
-        c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
+      Metadata metadata = c.element().getMetadata();
+      if (!metadata.isReadSeekEfficient()) {
+        c.output(KV.of(c.element(), new OffsetRange(0, metadata.sizeBytes())));
         return;
       }
       for (OffsetRange range :
           new OffsetRange(0, 
metadata.sizeBytes()).split(desiredBundleSizeBytes, 0)) {
-        c.output(KV.of(metadata, range));
+        c.output(KV.of(c.element(), range));
       }
     }
   }
 
-  private static class ReadFileRangesFn<T> extends DoFn<KV<Metadata, 
OffsetRange>, T> {
+  private static class ReadFileRangesFn<T> extends DoFn<KV<ReadableFile, 
OffsetRange>, T> {
     private final SerializableFunction<String, FileBasedSource<T>> 
createSource;
 
     private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> 
createSource) {
@@ -128,12 +119,14 @@ class ReadAllViaFileBasedSource<T> extends 
PTransform<PCollection<Metadata>, PCo
 
     @ProcessElement
     public void process(ProcessContext c) throws IOException {
-      Metadata metadata = c.element().getKey();
+      ReadableFile file = c.element().getKey();
       OffsetRange range = c.element().getValue();
-      FileBasedSource<T> source = createSource.apply(metadata.toString());
+      FileBasedSource<T> source =
+          
CompressedSource.from(createSource.apply(file.getMetadata().resourceId().toString()))
+              .withCompression(file.getCompression());
       try (BoundedSource.BoundedReader<T> reader =
           source
-              .createForSubrangeOfFile(metadata, range.getFrom(), 
range.getTo())
+              .createForSubrangeOfFile(file.getMetadata(), range.getFrom(), 
range.getTo())
               .createReader(c.getPipelineOptions())) {
         for (boolean more = reader.start(); more; more = reader.advance()) {
           c.output(reader.getCurrent());

http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java
index 4ef069c..51bb83e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadableFileCoder.java
@@ -21,22 +21,15 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.BooleanCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.io.fs.ResourceIdCoder;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MetadataCoder;
 
 /** A {@link Coder} for {@link FileIO.ReadableFile}. */
 public class ReadableFileCoder extends AtomicCoder<FileIO.ReadableFile> {
   private static final ReadableFileCoder INSTANCE = new ReadableFileCoder();
 
-  private static final BooleanCoder IS_SEEKABLE_CODER = BooleanCoder.of();
-  private static final VarIntCoder COMPRESSION_CODER = VarIntCoder.of();
-  private static final ResourceIdCoder RESOURCE_ID_CODER = 
ResourceIdCoder.of();
-  private static final VarLongCoder SIZE_CODER = VarLongCoder.of();
-
   /** Returns the instance of {@link ReadableFileCoder}. */
   public static ReadableFileCoder of() {
     return INSTANCE;
@@ -44,18 +37,14 @@ public class ReadableFileCoder extends 
AtomicCoder<FileIO.ReadableFile> {
 
   @Override
   public void encode(FileIO.ReadableFile value, OutputStream os) throws 
IOException {
-    RESOURCE_ID_CODER.encode(value.getResourceId(), os);
-    SIZE_CODER.encode(value.getSizeBytes(), os);
-    IS_SEEKABLE_CODER.encode(value.isSeekable(), os);
-    COMPRESSION_CODER.encode(value.getCompression().ordinal(), os);
+    MetadataCoder.of().encode(value.getMetadata(), os);
+    VarIntCoder.of().encode(value.getCompression().ordinal(), os);
   }
 
   @Override
   public FileIO.ReadableFile decode(InputStream is) throws IOException {
-    ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
-    long sizeBytes = SIZE_CODER.decode(is);
-    boolean isSeekable = IS_SEEKABLE_CODER.decode(is);
-    Compression compression = 
Compression.values()[COMPRESSION_CODER.decode(is)];
-    return new FileIO.ReadableFile(resourceId, sizeBytes, isSeekable, 
compression);
+    MatchResult.Metadata metadata = MetadataCoder.of().decode(is);
+    Compression compression = 
Compression.values()[VarIntCoder.of().decode(is)];
+    return new FileIO.ReadableFile(metadata, compression);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 57bfaa9..a17928e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.io.FileIO.ReadMatches.*;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -483,12 +484,15 @@ public class TextIO {
       return input
           .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
           .apply(
+              FileIO.readMatches()
+                  .withCompression(getCompression())
+                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+          .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
-                  new IsSplittableFn(getCompression()),
                   getDesiredBundleSizeBytes(),
-                  new CreateTextSourceFn(getCompression(), getDelimiter())))
-          .setCoder(StringUtf8Coder.of());
+                  new CreateTextSourceFn(getDelimiter()),
+                  StringUtf8Coder.of()));
     }
 
     @Override
@@ -507,34 +511,16 @@ public class TextIO {
 
     private static class CreateTextSourceFn
         implements SerializableFunction<String, FileBasedSource<String>> {
-      private final Compression compression;
       private byte[] delimiter;
 
-      private CreateTextSourceFn(
-          Compression compression, byte[] delimiter) {
-        this.compression = compression;
+      private CreateTextSourceFn(byte[] delimiter) {
         this.delimiter = delimiter;
       }
 
       @Override
       public FileBasedSource<String> apply(String input) {
-        return CompressedSource.from(
-                new TextSource(
-                    StaticValueProvider.of(input), 
EmptyMatchTreatment.DISALLOW, delimiter))
-            .withCompression(compression);
-      }
-    }
-
-    private static class IsSplittableFn implements 
SerializableFunction<String, Boolean> {
-      private final Compression compression;
-
-      private IsSplittableFn(Compression compression) {
-        this.compression = compression;
-      }
-
-      @Override
-      public Boolean apply(String filename) {
-        return !compression.isCompressed(filename);
+        return new TextSource(
+            StaticValueProvider.of(input), EmptyMatchTreatment.DISALLOW, 
delimiter);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6d4a7851/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
index 341d86a..7065bff 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
@@ -257,10 +257,10 @@ public class FileIOTest implements Serializable {
               new SerializableFunction<FileIO.ReadableFile, Void>() {
                 @Override
                 public Void apply(FileIO.ReadableFile input) {
-                  assertEquals(path, input.getResourceId().toString());
-                  assertEquals("Hello world".length(), input.getSizeBytes());
+                  assertEquals(path, 
input.getMetadata().resourceId().toString());
+                  assertEquals("Hello world".length(), 
input.getMetadata().sizeBytes());
                   assertEquals(Compression.UNCOMPRESSED, 
input.getCompression());
-                  assertTrue(input.isSeekable());
+                  assertTrue(input.getMetadata().isReadSeekEfficient());
                   try {
                     assertEquals("Hello world", input.readFullyAsUTF8String());
                   } catch (IOException e) {
@@ -286,10 +286,10 @@ public class FileIOTest implements Serializable {
               new SerializableFunction<FileIO.ReadableFile, Void>() {
                 @Override
                 public Void apply(FileIO.ReadableFile input) {
-                  assertEquals(pathGZ, input.getResourceId().toString());
-                  assertFalse(input.getSizeBytes() == "Hello world".length());
+                  assertEquals(pathGZ, 
input.getMetadata().resourceId().toString());
+                  assertFalse(input.getMetadata().sizeBytes() == "Hello 
world".length());
                   assertEquals(Compression.GZIP, input.getCompression());
-                  assertFalse(input.isSeekable());
+                  assertFalse(input.getMetadata().isReadSeekEfficient());
                   try {
                     assertEquals("Hello world", input.readFullyAsUTF8String());
                   } catch (IOException e) {

Reply via email to